this repo has no description
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "strings"
11 "sync"
12 "time"
13
14 "github.com/bluesky-social/jetstream/pkg/client"
15 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
16 "github.com/bluesky-social/jetstream/pkg/models"
17 "github.com/sotangled/tangled/api/tangled"
18 "github.com/sotangled/tangled/knotserver/db"
19 "github.com/sotangled/tangled/log"
20)
21
22type JetstreamClient struct {
23 cfg *client.ClientConfig
24 client *client.Client
25 reconnectCh chan struct{}
26 mu sync.RWMutex
27}
28
29func (h *Handle) StartJetstream(ctx context.Context) error {
30 l := h.l.With("component", "jetstream")
31 ctx = log.IntoContext(ctx, l)
32 collections := []string{tangled.PublicKeyNSID, tangled.KnotMemberNSID}
33 dids := []string{}
34
35 lastTimeUs, err := h.getLastTimeUs(ctx)
36 if err != nil {
37 return err
38 }
39
40 cfg := client.DefaultClientConfig()
41 cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe"
42 cfg.WantedCollections = collections
43 cfg.WantedDids = dids
44
45 sched := sequential.NewScheduler("knotserver", l, h.processMessages)
46
47 client, err := client.NewClient(cfg, l, sched)
48 if err != nil {
49 l.Error("failed to create jetstream client", "error", err)
50 }
51
52 jc := &JetstreamClient{
53 cfg: cfg,
54 client: client,
55 reconnectCh: make(chan struct{}),
56 }
57
58 h.jc = jc
59
60 go func() {
61 for len(h.jc.cfg.WantedDids) == 0 {
62 time.Sleep(time.Second)
63 }
64 h.connectAndRead(ctx, &lastTimeUs)
65 }()
66 return nil
67}
68
69func (h *Handle) connectAndRead(ctx context.Context, cursor *int64) {
70 l := log.FromContext(ctx)
71 for {
72 select {
73 case <-h.jc.reconnectCh:
74 l.Info("reconnecting jetstream client")
75 h.jc.client.Scheduler.Shutdown()
76 if err := h.jc.client.ConnectAndRead(ctx, cursor); err != nil {
77 l.Error("error reading jetstream", "error", err)
78 }
79 default:
80 if err := h.jc.client.ConnectAndRead(ctx, cursor); err != nil {
81 l.Error("error reading jetstream", "error", err)
82 }
83 }
84 }
85}
86
87func (j *JetstreamClient) UpdateDids(dids []string) {
88 j.mu.Lock()
89 j.cfg.WantedDids = dids
90 j.mu.Unlock()
91 j.reconnectCh <- struct{}{}
92}
93
94func (h *Handle) getLastTimeUs(ctx context.Context) (int64, error) {
95 l := log.FromContext(ctx)
96 lastTimeUs, err := h.db.GetLastTimeUs()
97 if err != nil {
98 l.Info("couldn't get last time us, starting from now")
99 lastTimeUs = time.Now().UnixMicro()
100 }
101
102 // If last time is older than a week, start from now
103 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 {
104 lastTimeUs = time.Now().UnixMicro()
105 l.Info("last time us is older than a week. discarding that and starting from now")
106 err = h.db.SaveLastTimeUs(lastTimeUs)
107 if err != nil {
108 l.Error("failed to save last time us")
109 }
110 }
111
112 l.Info("found last time_us", "time_us", lastTimeUs)
113 return lastTimeUs, nil
114}
115
116func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error {
117 l := log.FromContext(ctx)
118 pk := db.PublicKey{
119 Did: did,
120 PublicKey: record,
121 }
122 if err := h.db.AddPublicKey(pk); err != nil {
123 l.Error("failed to add public key", "error", err)
124 return fmt.Errorf("failed to add public key: %w", err)
125 }
126 l.Info("added public key from firehose", "did", did)
127 return nil
128}
129
130func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error {
131 l := log.FromContext(ctx)
132
133 if record.Domain != h.c.Server.Hostname {
134 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
135 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
136 }
137
138 ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite")
139 if err != nil || !ok {
140 l.Error("failed to add member", "did", did)
141 return fmt.Errorf("failed to enforce permissions: %w", err)
142 }
143
144 l.Info("adding member")
145 if err := h.e.AddMember(ThisServer, record.Member); err != nil {
146 l.Error("failed to add member", "error", err)
147 return fmt.Errorf("failed to add member: %w", err)
148 }
149 l.Info("added member from firehose", "member", record.Member)
150
151 if err := h.db.AddDid(did); err != nil {
152 l.Error("failed to add did", "error", err)
153 return fmt.Errorf("failed to add did: %w", err)
154 }
155
156 if err := h.fetchAndAddKeys(ctx, did); err != nil {
157 return fmt.Errorf("failed to fetch and add keys: %w", err)
158 }
159
160 h.jc.UpdateDids([]string{did})
161 return nil
162}
163
164func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
165 l := log.FromContext(ctx)
166
167 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
168 if err != nil {
169 l.Error("error building endpoint url", "did", did, "error", err.Error())
170 return fmt.Errorf("error building endpoint url: %w", err)
171 }
172
173 resp, err := http.Get(keysEndpoint)
174 if err != nil {
175 l.Error("error getting keys", "did", did, "error", err)
176 return fmt.Errorf("error getting keys: %w", err)
177 }
178 defer resp.Body.Close()
179
180 if resp.StatusCode == http.StatusNotFound {
181 l.Info("no keys found for did", "did", did)
182 return nil
183 }
184
185 if ct := resp.Header.Get("Content-Type"); !strings.HasPrefix(ct, "text/plain") {
186 return fmt.Errorf("unexpected content type: %s", ct)
187 }
188
189 plaintext, err := io.ReadAll(resp.Body)
190 if err != nil {
191 l.Error("error reading response body", "error", err)
192 return fmt.Errorf("error reading response body: %w", err)
193 }
194
195 for _, key := range strings.Split(string(plaintext), "\n") {
196 if key == "" {
197 continue
198 }
199 pk := db.PublicKey{
200 Did: did,
201 }
202 pk.Key = key
203 if err := h.db.AddPublicKey(pk); err != nil {
204 l.Error("failed to add public key", "error", err)
205 return fmt.Errorf("failed to add public key: %w", err)
206 }
207 }
208 return nil
209}
210
211func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
212 did := event.Did
213
214 raw := json.RawMessage(event.Commit.Record)
215
216 switch event.Commit.Collection {
217 case tangled.PublicKeyNSID:
218 var record tangled.PublicKey
219 if err := json.Unmarshal(raw, &record); err != nil {
220 return fmt.Errorf("failed to unmarshal record: %w", err)
221 }
222 if err := h.processPublicKey(ctx, did, record); err != nil {
223 return fmt.Errorf("failed to process public key: %w", err)
224 }
225
226 case tangled.KnotMemberNSID:
227 var record tangled.KnotMember
228 if err := json.Unmarshal(raw, &record); err != nil {
229 return fmt.Errorf("failed to unmarshal record: %w", err)
230 }
231 if err := h.processKnotMember(ctx, did, record); err != nil {
232 return fmt.Errorf("failed to process knot member: %w", err)
233 }
234 }
235
236 lastTimeUs := event.TimeUS
237 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
238 return fmt.Errorf("failed to save last time us: %w", err)
239 }
240
241 return nil
242}