this repo has no description
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "log"
9 "net/http"
10 "net/url"
11 "strings"
12 "time"
13
14 "github.com/sotangled/tangled/api/tangled"
15 "github.com/sotangled/tangled/knotserver/db"
16 "github.com/sotangled/tangled/knotserver/jsclient"
17)
18
19func (h *Handle) StartJetstream(ctx context.Context) error {
20 collections := []string{tangled.PublicKeyNSID, tangled.KnotMemberNSID}
21 dids := []string{}
22
23 lastTimeUs, err := h.getLastTimeUs()
24 if err != nil {
25 return err
26 }
27
28 h.js = jsclient.NewJetstreamClient(collections, dids)
29 messages, err := h.js.ReadJetstream(ctx, lastTimeUs)
30 if err != nil {
31 return fmt.Errorf("failed to read from jetstream: %w", err)
32 }
33
34 go h.processMessages(messages)
35
36 return nil
37}
38
39func (h *Handle) getLastTimeUs() (int64, error) {
40 lastTimeUs, err := h.db.GetLastTimeUs()
41 if err != nil {
42 log.Println("couldn't get last time us, starting from now")
43 lastTimeUs = time.Now().UnixMicro()
44 }
45
46 // If last time is older than a week, start from now
47 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 {
48 lastTimeUs = time.Now().UnixMicro()
49 log.Printf("last time us is older than a week. discarding that and starting from now.")
50 err = h.db.SaveLastTimeUs(lastTimeUs)
51 if err != nil {
52 log.Println("failed to save last time us")
53 }
54 }
55
56 log.Printf("found last time_us %d", lastTimeUs)
57 return lastTimeUs, nil
58}
59
60func (h *Handle) processPublicKey(did string, record map[string]interface{}) {
61 if err := h.db.AddPublicKeyFromRecord(did, record); err != nil {
62 log.Printf("failed to add public key: %v", err)
63 } else {
64 log.Printf("added public key from firehose: %s", did)
65 }
66}
67
68func (h *Handle) fetchAndAddKeys(did string) {
69 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
70 if err != nil {
71 log.Printf("error building endpoint url: %s: %v", did, err)
72 return
73 }
74
75 resp, err := http.Get(keysEndpoint)
76 if err != nil {
77 log.Printf("error getting keys for %s: %v", did, err)
78 return
79 }
80 defer resp.Body.Close()
81
82 plaintext, err := io.ReadAll(resp.Body)
83 if err != nil {
84 log.Printf("error reading response body: %v", err)
85 return
86 }
87
88 for _, key := range strings.Split(string(plaintext), "\n") {
89 if key == "" {
90 continue
91 }
92 pk := db.PublicKey{
93 Did: did,
94 }
95 pk.Key = key
96 if err := h.db.AddPublicKey(pk); err != nil {
97 log.Printf("failed to add public key: %v", err)
98 }
99 }
100}
101
102func (h *Handle) processKnotMember(did string, record map[string]interface{}) {
103 ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite")
104 if err != nil || !ok {
105 log.Printf("failed to add member from did %s", did)
106 return
107 }
108
109 log.Printf("adding member")
110 if err := h.e.AddMember(ThisServer, record["member"].(string)); err != nil {
111 log.Printf("failed to add member: %v", err)
112 } else {
113 log.Printf("added member from firehose: %s", record["member"])
114 }
115
116 h.fetchAndAddKeys(did)
117 h.js.UpdateDids([]string{did})
118}
119
120func (h *Handle) processMessages(messages <-chan []byte) {
121 <-h.init
122 log.Println("initalized jetstream watcher")
123
124 for msg := range messages {
125 var data map[string]interface{}
126 if err := json.Unmarshal(msg, &data); err != nil {
127 log.Printf("error unmarshaling message: %v", err)
128 continue
129 }
130
131 if kind, ok := data["kind"].(string); ok && kind == "commit" {
132 commit := data["commit"].(map[string]interface{})
133 did := data["did"].(string)
134 record := commit["record"].(map[string]interface{})
135
136 switch commit["collection"].(string) {
137 case tangled.PublicKeyNSID:
138 h.processPublicKey(did, record)
139 case tangled.KnotMemberNSID:
140 h.processKnotMember(did, record)
141 }
142
143 lastTimeUs := int64(data["time_us"].(float64))
144 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
145 log.Printf("failed to save last time us: %v", err)
146 }
147 }
148 }
149}