this repo has no description
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "strings"
11 "sync"
12 "time"
13
14 "github.com/bluesky-social/jetstream/pkg/client"
15 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
16 "github.com/bluesky-social/jetstream/pkg/models"
17 "github.com/sotangled/tangled/api/tangled"
18 "github.com/sotangled/tangled/knotserver/db"
19 "github.com/sotangled/tangled/log"
20)
21
22type JetstreamClient struct {
23 cfg *client.ClientConfig
24 client *client.Client
25 reconnectCh chan struct{}
26 mu sync.RWMutex
27}
28
29func (h *Handle) StartJetstream(ctx context.Context) error {
30 l := h.l.With("component", "jetstream")
31 ctx = log.IntoContext(ctx, l)
32 collections := []string{tangled.PublicKeyNSID, tangled.KnotMemberNSID}
33 dids := []string{}
34
35 lastTimeUs, err := h.getLastTimeUs(ctx)
36 if err != nil {
37 return err
38 }
39
40 cfg := client.DefaultClientConfig()
41 cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe"
42 cfg.WantedCollections = collections
43 cfg.WantedDids = dids
44
45 sched := sequential.NewScheduler("knotserver", l, h.processMessages)
46
47 client, err := client.NewClient(cfg, l, sched)
48 if err != nil {
49 l.Error("failed to create jetstream client", "error", err)
50 }
51
52 jc := &JetstreamClient{
53 cfg: cfg,
54 client: client,
55 reconnectCh: make(chan struct{}),
56 }
57
58 h.jc = jc
59
60 go func() {
61 for len(h.jc.cfg.WantedDids) == 0 {
62 time.Sleep(time.Second)
63 }
64 h.connectAndRead(ctx, &lastTimeUs)
65 }()
66 return nil
67}
68
69func (h *Handle) connectAndRead(ctx context.Context, cursor *int64) {
70 l := log.FromContext(ctx)
71 for {
72 select {
73 case <-h.jc.reconnectCh:
74 l.Info("reconnecting jetstream client")
75 h.jc.client.Scheduler.Shutdown()
76 if err := h.jc.client.ConnectAndRead(ctx, cursor); err != nil {
77 l.Error("error reading jetstream", "error", err)
78 }
79 default:
80 if err := h.jc.client.ConnectAndRead(ctx, cursor); err != nil {
81 l.Error("error reading jetstream", "error", err)
82 }
83 }
84 }
85}
86
87func (j *JetstreamClient) UpdateDids(dids []string) {
88 j.mu.Lock()
89 j.cfg.WantedDids = dids
90 j.mu.Unlock()
91 j.reconnectCh <- struct{}{}
92}
93
94func (h *Handle) getLastTimeUs(ctx context.Context) (int64, error) {
95 l := log.FromContext(ctx)
96 lastTimeUs, err := h.db.GetLastTimeUs()
97 if err != nil {
98 l.Info("couldn't get last time us, starting from now")
99 lastTimeUs = time.Now().UnixMicro()
100 }
101
102 // If last time is older than a week, start from now
103 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 {
104 lastTimeUs = time.Now().UnixMicro()
105 l.Info("last time us is older than a week. discarding that and starting from now")
106 err = h.db.SaveLastTimeUs(lastTimeUs)
107 if err != nil {
108 l.Error("failed to save last time us")
109 }
110 }
111
112 l.Info("found last time_us", "time_us", lastTimeUs)
113 return lastTimeUs, nil
114}
115
116func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error {
117 l := log.FromContext(ctx)
118 pk := db.PublicKey{
119 Did: did,
120 PublicKey: record,
121 }
122 if err := h.db.AddPublicKey(pk); err != nil {
123 l.Error("failed to add public key", "error", err)
124 return fmt.Errorf("failed to add public key: %w", err)
125 }
126 l.Info("added public key from firehose", "did", did)
127 return nil
128}
129
130func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error {
131 l := log.FromContext(ctx)
132
133 if record.Domain != h.c.Server.Hostname {
134 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
135 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
136 }
137
138 ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite")
139 if err != nil || !ok {
140 l.Error("failed to add member", "did", did)
141 return fmt.Errorf("failed to enforce permissions: %w", err)
142 }
143
144 l.Info("adding member")
145 if err := h.e.AddMember(ThisServer, record.Member); err != nil {
146 l.Error("failed to add member", "error", err)
147 return fmt.Errorf("failed to add member: %w", err)
148 }
149 l.Info("added member from firehose", "member", record.Member)
150
151 if err := h.db.AddDid(did); err != nil {
152 l.Error("failed to add did", "error", err)
153 return fmt.Errorf("failed to add did: %w", err)
154 }
155
156 if err := h.fetchAndAddKeys(ctx, did); err != nil {
157 return fmt.Errorf("failed to fetch and add keys: %w", err)
158 }
159
160 h.jc.UpdateDids([]string{did})
161 return nil
162}
163
164func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
165 l := log.FromContext(ctx)
166
167 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
168 if err != nil {
169 l.Error("error building endpoint url", "did", did, "error", err.Error())
170 return fmt.Errorf("error building endpoint url: %w", err)
171 }
172
173 resp, err := http.Get(keysEndpoint)
174 if err != nil {
175 l.Error("error getting keys", "did", did, "error", err)
176 return fmt.Errorf("error getting keys: %w", err)
177 }
178 defer resp.Body.Close()
179
180 if resp.StatusCode == http.StatusNotFound {
181 l.Info("no keys found for did", "did", did)
182 return nil
183 }
184
185 plaintext, err := io.ReadAll(resp.Body)
186 if err != nil {
187 l.Error("error reading response body", "error", err)
188 return fmt.Errorf("error reading response body: %w", err)
189 }
190
191 for _, key := range strings.Split(string(plaintext), "\n") {
192 if key == "" {
193 continue
194 }
195 pk := db.PublicKey{
196 Did: did,
197 }
198 pk.Key = key
199 if err := h.db.AddPublicKey(pk); err != nil {
200 l.Error("failed to add public key", "error", err)
201 return fmt.Errorf("failed to add public key: %w", err)
202 }
203 }
204 return nil
205}
206
207func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
208 did := event.Did
209
210 raw := json.RawMessage(event.Commit.Record)
211
212 switch event.Commit.Collection {
213 case tangled.PublicKeyNSID:
214 var record tangled.PublicKey
215 if err := json.Unmarshal(raw, &record); err != nil {
216 return fmt.Errorf("failed to unmarshal record: %w", err)
217 }
218 if err := h.processPublicKey(ctx, did, record); err != nil {
219 return fmt.Errorf("failed to process public key: %w", err)
220 }
221
222 case tangled.KnotMemberNSID:
223 var record tangled.KnotMember
224 if err := json.Unmarshal(raw, &record); err != nil {
225 return fmt.Errorf("failed to unmarshal record: %w", err)
226 }
227 if err := h.processKnotMember(ctx, did, record); err != nil {
228 return fmt.Errorf("failed to process knot member: %w", err)
229 }
230 }
231
232 lastTimeUs := event.TimeUS
233 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
234 return fmt.Errorf("failed to save last time us: %w", err)
235 }
236
237 return nil
238}