this repo has no description
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "log" 9 "net/http" 10 "net/url" 11 "strings" 12 "time" 13 14 "github.com/sotangled/tangled/api/tangled" 15 "github.com/sotangled/tangled/knotserver/db" 16 "github.com/sotangled/tangled/knotserver/jsclient" 17) 18 19func (h *Handle) StartJetstream(ctx context.Context) error { 20 collections := []string{tangled.PublicKeyNSID, tangled.KnotMemberNSID} 21 dids := []string{} 22 23 lastTimeUs, err := h.getLastTimeUs() 24 if err != nil { 25 return err 26 } 27 28 h.js = jsclient.NewJetstreamClient(collections, dids) 29 messages, err := h.js.ReadJetstream(ctx, lastTimeUs) 30 if err != nil { 31 return fmt.Errorf("failed to read from jetstream: %w", err) 32 } 33 34 go h.processMessages(messages) 35 36 return nil 37} 38 39func (h *Handle) getLastTimeUs() (int64, error) { 40 lastTimeUs, err := h.db.GetLastTimeUs() 41 if err != nil { 42 log.Println("couldn't get last time us, starting from now") 43 lastTimeUs = time.Now().UnixMicro() 44 } 45 46 // If last time is older than a week, start from now 47 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 { 48 lastTimeUs = time.Now().UnixMicro() 49 log.Printf("last time us is older than a week. discarding that and starting from now.") 50 err = h.db.SaveLastTimeUs(lastTimeUs) 51 if err != nil { 52 log.Println("failed to save last time us") 53 } 54 } 55 56 log.Printf("found last time_us %d", lastTimeUs) 57 return lastTimeUs, nil 58} 59 60func (h *Handle) processPublicKey(did string, record map[string]interface{}) { 61 if err := h.db.AddPublicKeyFromRecord(did, record); err != nil { 62 log.Printf("failed to add public key: %v", err) 63 } else { 64 log.Printf("added public key from firehose: %s", did) 65 } 66} 67 68func (h *Handle) fetchAndAddKeys(did string) { 69 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 70 if err != nil { 71 log.Printf("error building endpoint url: %s: %v", did, err) 72 return 73 } 74 75 resp, err := http.Get(keysEndpoint) 76 if err != nil { 77 log.Printf("error getting keys for %s: %v", did, err) 78 return 79 } 80 defer resp.Body.Close() 81 82 plaintext, err := io.ReadAll(resp.Body) 83 if err != nil { 84 log.Printf("error reading response body: %v", err) 85 return 86 } 87 88 for _, key := range strings.Split(string(plaintext), "\n") { 89 if key == "" { 90 continue 91 } 92 pk := db.PublicKey{ 93 Did: did, 94 } 95 pk.Key = key 96 if err := h.db.AddPublicKey(pk); err != nil { 97 log.Printf("failed to add public key: %v", err) 98 } 99 } 100} 101 102func (h *Handle) processKnotMember(did string, record map[string]interface{}) { 103 ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite") 104 if err != nil || !ok { 105 log.Printf("failed to add member from did %s", did) 106 return 107 } 108 109 log.Printf("adding member") 110 if err := h.e.AddMember(ThisServer, record["member"].(string)); err != nil { 111 log.Printf("failed to add member: %v", err) 112 } else { 113 log.Printf("added member from firehose: %s", record["member"]) 114 } 115 116 h.fetchAndAddKeys(did) 117 h.js.UpdateDids([]string{did}) 118} 119 120func (h *Handle) processMessages(messages <-chan []byte) { 121 <-h.init 122 log.Println("initalized jetstream watcher") 123 124 for msg := range messages { 125 var data map[string]interface{} 126 if err := json.Unmarshal(msg, &data); err != nil { 127 log.Printf("error unmarshaling message: %v", err) 128 continue 129 } 130 131 if kind, ok := data["kind"].(string); ok && kind == "commit" { 132 commit := data["commit"].(map[string]interface{}) 133 did := data["did"].(string) 134 record := commit["record"].(map[string]interface{}) 135 136 switch commit["collection"].(string) { 137 case tangled.PublicKeyNSID: 138 h.processPublicKey(did, record) 139 case tangled.KnotMemberNSID: 140 h.processKnotMember(did, record) 141 } 142 143 lastTimeUs := int64(data["time_us"].(float64)) 144 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 145 log.Printf("failed to save last time us: %v", err) 146 } 147 } 148 } 149}