this repo has no description
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "strings" 11 "time" 12 13 "github.com/sotangled/tangled/api/tangled" 14 "github.com/sotangled/tangled/knotserver/db" 15 "github.com/sotangled/tangled/knotserver/jsclient" 16 "github.com/sotangled/tangled/log" 17) 18 19func (h *Handle) StartJetstream(ctx context.Context) error { 20 l := h.l.With("component", "jetstream") 21 ctx = log.IntoContext(ctx, l) 22 collections := []string{tangled.PublicKeyNSID, tangled.KnotMemberNSID} 23 dids := []string{} 24 25 lastTimeUs, err := h.getLastTimeUs(ctx) 26 if err != nil { 27 return err 28 } 29 30 h.js = jsclient.NewJetstreamClient(collections, dids) 31 messages, err := h.js.ReadJetstream(ctx, lastTimeUs) 32 if err != nil { 33 return fmt.Errorf("failed to read from jetstream: %w", err) 34 } 35 36 go h.processMessages(ctx, messages) 37 38 return nil 39} 40 41func (h *Handle) getLastTimeUs(ctx context.Context) (int64, error) { 42 l := log.FromContext(ctx) 43 lastTimeUs, err := h.db.GetLastTimeUs() 44 if err != nil { 45 l.Info("couldn't get last time us, starting from now") 46 lastTimeUs = time.Now().UnixMicro() 47 } 48 49 // If last time is older than a week, start from now 50 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 { 51 lastTimeUs = time.Now().UnixMicro() 52 l.Info("last time us is older than a week. discarding that and starting from now") 53 err = h.db.SaveLastTimeUs(lastTimeUs) 54 if err != nil { 55 l.Error("failed to save last time us") 56 } 57 } 58 59 l.Info("found last time_us", "time_us", lastTimeUs) 60 return lastTimeUs, nil 61} 62 63func (h *Handle) processPublicKey(ctx context.Context, did string, record map[string]interface{}) error { 64 l := log.FromContext(ctx) 65 if err := h.db.AddPublicKeyFromRecord(did, record); err != nil { 66 l.Error("failed to add public key", "error", err) 67 return fmt.Errorf("failed to add public key: %w", err) 68 } 69 l.Info("added public key from firehose", "did", did) 70 return nil 71} 72 73func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error { 74 l := log.FromContext(ctx) 75 76 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 77 if err != nil { 78 l.Error("error building endpoint url", "did", did, "error", err.Error()) 79 return fmt.Errorf("error building endpoint url: %w", err) 80 } 81 82 resp, err := http.Get(keysEndpoint) 83 if err != nil { 84 l.Error("error getting keys", "did", did, "error", err) 85 return fmt.Errorf("error getting keys: %w", err) 86 } 87 defer resp.Body.Close() 88 89 plaintext, err := io.ReadAll(resp.Body) 90 if err != nil { 91 l.Error("error reading response body", "error", err) 92 return fmt.Errorf("error reading response body: %w", err) 93 } 94 95 for _, key := range strings.Split(string(plaintext), "\n") { 96 if key == "" { 97 continue 98 } 99 pk := db.PublicKey{ 100 Did: did, 101 } 102 pk.Key = key 103 if err := h.db.AddPublicKey(pk); err != nil { 104 l.Error("failed to add public key", "error", err) 105 return fmt.Errorf("failed to add public key: %w", err) 106 } 107 } 108 return nil 109} 110 111func (h *Handle) processKnotMember(ctx context.Context, did string, record map[string]interface{}) error { 112 l := log.FromContext(ctx) 113 ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite") 114 if err != nil || !ok { 115 l.Error("failed to add member", "did", did) 116 return fmt.Errorf("failed to enforce permissions: %w", err) 117 } 118 119 l.Info("adding member") 120 if err := h.e.AddMember(ThisServer, record["member"].(string)); err != nil { 121 l.Error("failed to add member", "error", err) 122 return fmt.Errorf("failed to add member: %w", err) 123 } 124 l.Info("added member from firehose", "member", record["member"]) 125 126 if err := h.db.AddDid(did); err != nil { 127 l.Error("failed to add did", "error", err) 128 return fmt.Errorf("failed to add did: %w", err) 129 } 130 131 if err := h.fetchAndAddKeys(ctx, did); err != nil { 132 return fmt.Errorf("failed to fetch and add keys: %w", err) 133 } 134 135 h.js.UpdateDids([]string{did}) 136 return nil 137} 138 139func (h *Handle) processMessages(ctx context.Context, messages <-chan []byte) { 140 l := log.FromContext(ctx) 141 l.Info("waiting for knot to be initialized") 142 <-h.init 143 l.Info("initialized jetstream watcher") 144 145 for msg := range messages { 146 var data map[string]interface{} 147 if err := json.Unmarshal(msg, &data); err != nil { 148 l.Error("error unmarshaling message", "error", err) 149 continue 150 } 151 152 if kind, ok := data["kind"].(string); ok && kind == "commit" { 153 commit := data["commit"].(map[string]interface{}) 154 did := data["did"].(string) 155 record := commit["record"].(map[string]interface{}) 156 157 var processErr error 158 switch commit["collection"].(string) { 159 case tangled.PublicKeyNSID: 160 if err := h.processPublicKey(ctx, did, record); err != nil { 161 processErr = fmt.Errorf("failed to process public key: %w", err) 162 } 163 case tangled.KnotMemberNSID: 164 if err := h.processKnotMember(ctx, did, record); err != nil { 165 processErr = fmt.Errorf("failed to process knot member: %w", err) 166 } 167 } 168 169 if processErr != nil { 170 l.Error("error processing message", "error", processErr) 171 continue 172 } 173 174 lastTimeUs := int64(data["time_us"].(float64)) 175 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 176 l.Error("failed to save last time us", "error", err) 177 continue 178 } 179 } 180 } 181}