this repo has no description
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "log" 9 "net/http" 10 "path" 11 "strings" 12 "time" 13 14 "github.com/sotangled/tangled/api/tangled" 15 "github.com/sotangled/tangled/knotserver/db" 16 "github.com/sotangled/tangled/knotserver/jsclient" 17) 18 19func (h *Handle) StartJetstream(ctx context.Context) error { 20 collections := []string{tangled.PublicKeyNSID, tangled.KnotMemberNSID} 21 dids := []string{} 22 23 lastTimeUs, err := h.getLastTimeUs() 24 if err != nil { 25 return err 26 } 27 28 h.js = jsclient.NewJetstreamClient(collections, dids) 29 messages, err := h.js.ReadJetstream(ctx, lastTimeUs) 30 if err != nil { 31 return fmt.Errorf("failed to read from jetstream: %w", err) 32 } 33 34 go h.processMessages(messages) 35 36 return nil 37} 38 39func (h *Handle) getLastTimeUs() (int64, error) { 40 lastTimeUs, err := h.db.GetLastTimeUs() 41 if err != nil { 42 log.Println("couldn't get last time us, starting from now") 43 lastTimeUs = time.Now().UnixMicro() 44 } 45 46 // If last time is older than a week, start from now 47 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 { 48 lastTimeUs = time.Now().UnixMicro() 49 log.Printf("last time us is older than a week. discarding that and starting from now.") 50 err = h.db.SaveLastTimeUs(lastTimeUs) 51 if err != nil { 52 log.Println("failed to save last time us") 53 } 54 } 55 56 log.Printf("found last time_us %d", lastTimeUs) 57 return lastTimeUs, nil 58} 59 60func (h *Handle) processPublicKey(did string, record map[string]interface{}) { 61 if err := h.db.AddPublicKeyFromRecord(did, record); err != nil { 62 log.Printf("failed to add public key: %v", err) 63 } else { 64 log.Printf("added public key from firehose: %s", did) 65 } 66} 67 68func (h *Handle) fetchAndAddKeys(did string) { 69 resp, err := http.Get(path.Join(h.c.AppViewEndpoint, did)) 70 if err != nil { 71 log.Printf("error getting keys for %s: %v", did, err) 72 return 73 } 74 defer resp.Body.Close() 75 76 plaintext, err := io.ReadAll(resp.Body) 77 if err != nil { 78 log.Printf("error reading response body: %v", err) 79 return 80 } 81 82 for _, key := range strings.Split(string(plaintext), "\n") { 83 if key == "" { 84 continue 85 } 86 pk := db.PublicKey{ 87 Did: did, 88 } 89 pk.Key = key 90 if err := h.db.AddPublicKey(pk); err != nil { 91 log.Printf("failed to add public key: %v", err) 92 } 93 } 94} 95 96func (h *Handle) processKnotMember(did string, record map[string]interface{}) { 97 ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite") 98 if err != nil || !ok { 99 log.Printf("failed to add member from did %s", did) 100 return 101 } 102 103 log.Printf("adding member") 104 if err := h.e.AddMember(ThisServer, record["member"].(string)); err != nil { 105 log.Printf("failed to add member: %v", err) 106 } else { 107 log.Printf("added member from firehose: %s", record["member"]) 108 } 109 110 h.fetchAndAddKeys(did) 111 h.js.UpdateDids([]string{did}) 112} 113 114func (h *Handle) processMessages(messages <-chan []byte) { 115 log.Println("waiting for knot to be initialized") 116 <-h.init 117 log.Println("initalized jetstream watcher") 118 119 for msg := range messages { 120 var data map[string]interface{} 121 if err := json.Unmarshal(msg, &data); err != nil { 122 log.Printf("error unmarshaling message: %v", err) 123 continue 124 } 125 126 if kind, ok := data["kind"].(string); ok && kind == "commit" { 127 commit := data["commit"].(map[string]interface{}) 128 did := data["did"].(string) 129 record := commit["record"].(map[string]interface{}) 130 131 switch commit["collection"].(string) { 132 case tangled.PublicKeyNSID: 133 h.processPublicKey(did, record) 134 case tangled.KnotMemberNSID: 135 h.processKnotMember(did, record) 136 } 137 138 lastTimeUs := int64(data["time_us"].(float64)) 139 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 140 log.Printf("failed to save last time us: %v", err) 141 } 142 } 143 } 144}