this repo has no description
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "strings" 11 "sync" 12 "time" 13 14 "github.com/bluesky-social/jetstream/pkg/client" 15 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 16 "github.com/bluesky-social/jetstream/pkg/models" 17 "github.com/sotangled/tangled/api/tangled" 18 "github.com/sotangled/tangled/knotserver/db" 19 "github.com/sotangled/tangled/log" 20) 21 22type JetstreamClient struct { 23 cfg *client.ClientConfig 24 client *client.Client 25 reconnectCh chan struct{} 26 mu sync.RWMutex 27} 28 29func (h *Handle) StartJetstream(ctx context.Context) error { 30 l := h.l.With("component", "jetstream") 31 ctx = log.IntoContext(ctx, l) 32 collections := []string{tangled.PublicKeyNSID, tangled.KnotMemberNSID} 33 dids := []string{} 34 35 lastTimeUs, err := h.getLastTimeUs(ctx) 36 if err != nil { 37 return err 38 } 39 40 cfg := client.DefaultClientConfig() 41 cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe" 42 cfg.WantedCollections = collections 43 cfg.WantedDids = dids 44 45 sched := sequential.NewScheduler("knotserver", l, h.processMessages) 46 47 client, err := client.NewClient(cfg, l, sched) 48 if err != nil { 49 l.Error("failed to create jetstream client", "error", err) 50 } 51 52 jc := &JetstreamClient{ 53 cfg: cfg, 54 client: client, 55 reconnectCh: make(chan struct{}), 56 } 57 58 h.jc = jc 59 60 go func() { 61 for len(h.jc.cfg.WantedDids) == 0 { 62 time.Sleep(time.Second) 63 } 64 h.connectAndRead(ctx, &lastTimeUs) 65 }() 66 return nil 67} 68 69func (h *Handle) connectAndRead(ctx context.Context, cursor *int64) { 70 l := log.FromContext(ctx) 71 for { 72 select { 73 case <-h.jc.reconnectCh: 74 l.Info("reconnecting jetstream client") 75 h.jc.client.Scheduler.Shutdown() 76 if err := h.jc.client.ConnectAndRead(ctx, cursor); err != nil { 77 l.Error("error reading jetstream", "error", err) 78 } 79 default: 80 if err := h.jc.client.ConnectAndRead(ctx, cursor); err != nil { 81 l.Error("error reading jetstream", "error", err) 82 } 83 } 84 } 85} 86 87func (j *JetstreamClient) UpdateDids(dids []string) { 88 j.mu.Lock() 89 j.cfg.WantedDids = dids 90 j.mu.Unlock() 91 j.reconnectCh <- struct{}{} 92} 93 94func (h *Handle) getLastTimeUs(ctx context.Context) (int64, error) { 95 l := log.FromContext(ctx) 96 lastTimeUs, err := h.db.GetLastTimeUs() 97 if err != nil { 98 l.Info("couldn't get last time us, starting from now") 99 lastTimeUs = time.Now().UnixMicro() 100 } 101 102 // If last time is older than a week, start from now 103 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 { 104 lastTimeUs = time.Now().UnixMicro() 105 l.Info("last time us is older than a week. discarding that and starting from now") 106 err = h.db.SaveLastTimeUs(lastTimeUs) 107 if err != nil { 108 l.Error("failed to save last time us") 109 } 110 } 111 112 l.Info("found last time_us", "time_us", lastTimeUs) 113 return lastTimeUs, nil 114} 115 116func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error { 117 l := log.FromContext(ctx) 118 pk := db.PublicKey{ 119 Did: did, 120 PublicKey: record, 121 } 122 if err := h.db.AddPublicKey(pk); err != nil { 123 l.Error("failed to add public key", "error", err) 124 return fmt.Errorf("failed to add public key: %w", err) 125 } 126 l.Info("added public key from firehose", "did", did) 127 return nil 128} 129 130func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error { 131 l := log.FromContext(ctx) 132 133 if record.Domain != h.c.Server.Hostname { 134 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 135 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 136 } 137 138 ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite") 139 if err != nil || !ok { 140 l.Error("failed to add member", "did", did) 141 return fmt.Errorf("failed to enforce permissions: %w", err) 142 } 143 144 l.Info("adding member") 145 if err := h.e.AddMember(ThisServer, record.Member); err != nil { 146 l.Error("failed to add member", "error", err) 147 return fmt.Errorf("failed to add member: %w", err) 148 } 149 l.Info("added member from firehose", "member", record.Member) 150 151 if err := h.db.AddDid(did); err != nil { 152 l.Error("failed to add did", "error", err) 153 return fmt.Errorf("failed to add did: %w", err) 154 } 155 156 if err := h.fetchAndAddKeys(ctx, did); err != nil { 157 return fmt.Errorf("failed to fetch and add keys: %w", err) 158 } 159 160 h.jc.UpdateDids([]string{did}) 161 return nil 162} 163 164func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error { 165 l := log.FromContext(ctx) 166 167 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 168 if err != nil { 169 l.Error("error building endpoint url", "did", did, "error", err.Error()) 170 return fmt.Errorf("error building endpoint url: %w", err) 171 } 172 173 resp, err := http.Get(keysEndpoint) 174 if err != nil { 175 l.Error("error getting keys", "did", did, "error", err) 176 return fmt.Errorf("error getting keys: %w", err) 177 } 178 defer resp.Body.Close() 179 180 if ct := resp.Header.Get("Content-Type"); !strings.HasPrefix(ct, "text/plain") { 181 return fmt.Errorf("unexpected content type: %s", ct) 182 } 183 184 plaintext, err := io.ReadAll(resp.Body) 185 if err != nil { 186 l.Error("error reading response body", "error", err) 187 return fmt.Errorf("error reading response body: %w", err) 188 } 189 190 for _, key := range strings.Split(string(plaintext), "\n") { 191 if key == "" { 192 continue 193 } 194 pk := db.PublicKey{ 195 Did: did, 196 } 197 pk.Key = key 198 if err := h.db.AddPublicKey(pk); err != nil { 199 l.Error("failed to add public key", "error", err) 200 return fmt.Errorf("failed to add public key: %w", err) 201 } 202 } 203 return nil 204} 205 206func (h *Handle) processMessages(ctx context.Context, event *models.Event) error { 207 did := event.Did 208 209 raw := json.RawMessage(event.Commit.Record) 210 211 switch event.Commit.Collection { 212 case tangled.PublicKeyNSID: 213 var record tangled.PublicKey 214 if err := json.Unmarshal(raw, &record); err != nil { 215 return fmt.Errorf("failed to unmarshal record: %w", err) 216 } 217 if err := h.processPublicKey(ctx, did, record); err != nil { 218 return fmt.Errorf("failed to process public key: %w", err) 219 } 220 221 case tangled.KnotMemberNSID: 222 var record tangled.KnotMember 223 if err := json.Unmarshal(raw, &record); err != nil { 224 return fmt.Errorf("failed to unmarshal record: %w", err) 225 } 226 if err := h.processKnotMember(ctx, did, record); err != nil { 227 return fmt.Errorf("failed to process knot member: %w", err) 228 } 229 } 230 231 lastTimeUs := event.TimeUS 232 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 233 return fmt.Errorf("failed to save last time us: %w", err) 234 } 235 236 return nil 237}