this repo has no description
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "strings" 11 "time" 12 13 "github.com/sotangled/tangled/api/tangled" 14 "github.com/sotangled/tangled/knotserver/db" 15 "github.com/sotangled/tangled/knotserver/jsclient" 16 "github.com/sotangled/tangled/log" 17) 18 19func (h *Handle) StartJetstream(ctx context.Context) error { 20 l := h.l.With("component", "jetstream") 21 ctx = log.IntoContext(ctx, l) 22 collections := []string{tangled.PublicKeyNSID, tangled.KnotMemberNSID} 23 dids := []string{} 24 25 lastTimeUs, err := h.getLastTimeUs(ctx) 26 if err != nil { 27 return err 28 } 29 30 h.js = jsclient.NewJetstreamClient(collections, dids) 31 messages, err := h.js.ReadJetstream(ctx, lastTimeUs) 32 if err != nil { 33 return fmt.Errorf("failed to read from jetstream: %w", err) 34 } 35 36 go h.processMessages(ctx, messages) 37 38 return nil 39} 40 41func (h *Handle) getLastTimeUs(ctx context.Context) (int64, error) { 42 l := log.FromContext(ctx) 43 lastTimeUs, err := h.db.GetLastTimeUs() 44 if err != nil { 45 l.Info("couldn't get last time us, starting from now") 46 lastTimeUs = time.Now().UnixMicro() 47 } 48 49 // If last time is older than a week, start from now 50 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 { 51 lastTimeUs = time.Now().UnixMicro() 52 l.Info("last time us is older than a week. discarding that and starting from now") 53 err = h.db.SaveLastTimeUs(lastTimeUs) 54 if err != nil { 55 l.Error("failed to save last time us") 56 } 57 } 58 59 l.Info("found last time_us", "time_us", lastTimeUs) 60 return lastTimeUs, nil 61} 62 63func (h *Handle) processPublicKey(ctx context.Context, did string, record map[string]interface{}) error { 64 l := log.FromContext(ctx) 65 if err := h.db.AddPublicKeyFromRecord(did, record); err != nil { 66 l.Error("failed to add public key", "error", err) 67 return fmt.Errorf("failed to add public key: %w", err) 68 } 69 l.Info("added public key from firehose", "did", did) 70 return nil 71} 72 73func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error { 74 l := log.FromContext(ctx) 75 76 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 77 if err != nil { 78 l.Error("error building endpoint url", "did", did, "error", err.Error()) 79 return fmt.Errorf("error building endpoint url: %w", err) 80 } 81 82 resp, err := http.Get(keysEndpoint) 83 if err != nil { 84 l.Error("error getting keys", "did", did, "error", err) 85 return fmt.Errorf("error getting keys: %w", err) 86 } 87 defer resp.Body.Close() 88 89 if ct := resp.Header.Get("Content-Type"); !strings.HasPrefix(ct, "text/plain") { 90 l.Error("unexpected content type", "content-type", ct) 91 return fmt.Errorf("unexpected content type: %s", ct) 92 } 93 94 plaintext, err := io.ReadAll(resp.Body) 95 if err != nil { 96 l.Error("error reading response body", "error", err) 97 return fmt.Errorf("error reading response body: %w", err) 98 } 99 100 for _, key := range strings.Split(string(plaintext), "\n") { 101 if key == "" { 102 continue 103 } 104 pk := db.PublicKey{ 105 Did: did, 106 } 107 pk.Key = key 108 if err := h.db.AddPublicKey(pk); err != nil { 109 l.Error("failed to add public key", "error", err) 110 return fmt.Errorf("failed to add public key: %w", err) 111 } 112 } 113 return nil 114} 115 116func (h *Handle) processKnotMember(ctx context.Context, did string, record map[string]interface{}) error { 117 l := log.FromContext(ctx) 118 119 if record["domain"] != h.c.Server.Hostname { 120 l.Error("domain mismatch", "domain", record["domain"], "expected", h.c.Server.Hostname) 121 return fmt.Errorf("domain mismatch: %s != %s", record["domain"], h.c.Server.Hostname) 122 } 123 124 ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite") 125 if err != nil || !ok { 126 l.Error("failed to add member", "did", did) 127 return fmt.Errorf("failed to enforce permissions: %w", err) 128 } 129 130 l.Info("adding member") 131 if err := h.e.AddMember(ThisServer, record["member"].(string)); err != nil { 132 l.Error("failed to add member", "error", err) 133 return fmt.Errorf("failed to add member: %w", err) 134 } 135 l.Info("added member from firehose", "member", record["member"]) 136 137 if err := h.db.AddDid(did); err != nil { 138 l.Error("failed to add did", "error", err) 139 return fmt.Errorf("failed to add did: %w", err) 140 } 141 142 if err := h.fetchAndAddKeys(ctx, did); err != nil { 143 return fmt.Errorf("failed to fetch and add keys: %w", err) 144 } 145 146 h.js.UpdateDids([]string{did}) 147 return nil 148} 149 150func (h *Handle) processMessages(ctx context.Context, messages <-chan []byte) { 151 l := log.FromContext(ctx) 152 l.Info("waiting for knot to be initialized") 153 <-h.init 154 l.Info("initialized jetstream watcher") 155 156 for msg := range messages { 157 var data map[string]interface{} 158 if err := json.Unmarshal(msg, &data); err != nil { 159 l.Error("error unmarshaling message", "error", err) 160 continue 161 } 162 163 if kind, ok := data["kind"].(string); ok && kind == "commit" { 164 commit := data["commit"].(map[string]interface{}) 165 did := data["did"].(string) 166 record := commit["record"].(map[string]interface{}) 167 168 var processErr error 169 switch commit["collection"].(string) { 170 case tangled.PublicKeyNSID: 171 if err := h.processPublicKey(ctx, did, record); err != nil { 172 processErr = fmt.Errorf("failed to process public key: %w", err) 173 } 174 case tangled.KnotMemberNSID: 175 if err := h.processKnotMember(ctx, did, record); err != nil { 176 processErr = fmt.Errorf("failed to process knot member: %w", err) 177 } 178 } 179 180 if processErr != nil { 181 l.Error("error processing message", "error", processErr) 182 continue 183 } 184 185 lastTimeUs := int64(data["time_us"].(float64)) 186 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 187 l.Error("failed to save last time us", "error", err) 188 continue 189 } 190 } 191 } 192}