this repo has no description
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "strings"
11 "time"
12
13 "github.com/sotangled/tangled/api/tangled"
14 "github.com/sotangled/tangled/knotserver/db"
15 "github.com/sotangled/tangled/knotserver/jsclient"
16 "github.com/sotangled/tangled/log"
17)
18
19func (h *Handle) StartJetstream(ctx context.Context) error {
20 l := h.l.With("component", "jetstream")
21 ctx = log.IntoContext(ctx, l)
22 collections := []string{tangled.PublicKeyNSID, tangled.KnotMemberNSID}
23 dids := []string{}
24
25 lastTimeUs, err := h.getLastTimeUs(ctx)
26 if err != nil {
27 return err
28 }
29
30 h.js = jsclient.NewJetstreamClient(collections, dids)
31 messages, err := h.js.ReadJetstream(ctx, lastTimeUs)
32 if err != nil {
33 return fmt.Errorf("failed to read from jetstream: %w", err)
34 }
35
36 go h.processMessages(ctx, messages)
37
38 return nil
39}
40
41func (h *Handle) getLastTimeUs(ctx context.Context) (int64, error) {
42 l := log.FromContext(ctx)
43 lastTimeUs, err := h.db.GetLastTimeUs()
44 if err != nil {
45 l.Info("couldn't get last time us, starting from now")
46 lastTimeUs = time.Now().UnixMicro()
47 }
48
49 // If last time is older than a week, start from now
50 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 {
51 lastTimeUs = time.Now().UnixMicro()
52 l.Info("last time us is older than a week. discarding that and starting from now")
53 err = h.db.SaveLastTimeUs(lastTimeUs)
54 if err != nil {
55 l.Error("failed to save last time us")
56 }
57 }
58
59 l.Info("found last time_us", "time_us", lastTimeUs)
60 return lastTimeUs, nil
61}
62
63func (h *Handle) processPublicKey(ctx context.Context, did string, record map[string]interface{}) error {
64 l := log.FromContext(ctx)
65 if err := h.db.AddPublicKeyFromRecord(did, record); err != nil {
66 l.Error("failed to add public key", "error", err)
67 return fmt.Errorf("failed to add public key: %w", err)
68 }
69 l.Info("added public key from firehose", "did", did)
70 return nil
71}
72
73func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
74 l := log.FromContext(ctx)
75
76 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
77 if err != nil {
78 l.Error("error building endpoint url", "did", did, "error", err.Error())
79 return fmt.Errorf("error building endpoint url: %w", err)
80 }
81
82 resp, err := http.Get(keysEndpoint)
83 if err != nil {
84 l.Error("error getting keys", "did", did, "error", err)
85 return fmt.Errorf("error getting keys: %w", err)
86 }
87 defer resp.Body.Close()
88
89 plaintext, err := io.ReadAll(resp.Body)
90 if err != nil {
91 l.Error("error reading response body", "error", err)
92 return fmt.Errorf("error reading response body: %w", err)
93 }
94
95 for _, key := range strings.Split(string(plaintext), "\n") {
96 if key == "" {
97 continue
98 }
99 pk := db.PublicKey{
100 Did: did,
101 }
102 pk.Key = key
103 if err := h.db.AddPublicKey(pk); err != nil {
104 l.Error("failed to add public key", "error", err)
105 return fmt.Errorf("failed to add public key: %w", err)
106 }
107 }
108 return nil
109}
110
111func (h *Handle) processKnotMember(ctx context.Context, did string, record map[string]interface{}) error {
112 l := log.FromContext(ctx)
113
114 if record["domain"] != h.c.Server.Hostname {
115 l.Error("domain mismatch", "domain", record["domain"], "expected", h.c.Server.Hostname)
116 return fmt.Errorf("domain mismatch: %s != %s", record["domain"], h.c.Server.Hostname)
117 }
118
119 ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite")
120 if err != nil || !ok {
121 l.Error("failed to add member", "did", did)
122 return fmt.Errorf("failed to enforce permissions: %w", err)
123 }
124
125 l.Info("adding member")
126 if err := h.e.AddMember(ThisServer, record["member"].(string)); err != nil {
127 l.Error("failed to add member", "error", err)
128 return fmt.Errorf("failed to add member: %w", err)
129 }
130 l.Info("added member from firehose", "member", record["member"])
131
132 if err := h.db.AddDid(did); err != nil {
133 l.Error("failed to add did", "error", err)
134 return fmt.Errorf("failed to add did: %w", err)
135 }
136
137 if err := h.fetchAndAddKeys(ctx, did); err != nil {
138 return fmt.Errorf("failed to fetch and add keys: %w", err)
139 }
140
141 h.js.UpdateDids([]string{did})
142 return nil
143}
144
145func (h *Handle) processMessages(ctx context.Context, messages <-chan []byte) {
146 l := log.FromContext(ctx)
147 l.Info("waiting for knot to be initialized")
148 <-h.init
149 l.Info("initialized jetstream watcher")
150
151 for msg := range messages {
152 var data map[string]interface{}
153 if err := json.Unmarshal(msg, &data); err != nil {
154 l.Error("error unmarshaling message", "error", err)
155 continue
156 }
157
158 if kind, ok := data["kind"].(string); ok && kind == "commit" {
159 commit := data["commit"].(map[string]interface{})
160 did := data["did"].(string)
161 record := commit["record"].(map[string]interface{})
162
163 var processErr error
164 switch commit["collection"].(string) {
165 case tangled.PublicKeyNSID:
166 if err := h.processPublicKey(ctx, did, record); err != nil {
167 processErr = fmt.Errorf("failed to process public key: %w", err)
168 }
169 case tangled.KnotMemberNSID:
170 if err := h.processKnotMember(ctx, did, record); err != nil {
171 processErr = fmt.Errorf("failed to process knot member: %w", err)
172 }
173 }
174
175 if processErr != nil {
176 l.Error("error processing message", "error", processErr)
177 continue
178 }
179
180 lastTimeUs := int64(data["time_us"].(float64))
181 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
182 l.Error("failed to save last time us", "error", err)
183 continue
184 }
185 }
186 }
187}