this repo has no description
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "log"
9 "net/http"
10 "path"
11 "strings"
12 "time"
13
14 "github.com/sotangled/tangled/api/tangled"
15 "github.com/sotangled/tangled/knotserver/db"
16 "github.com/sotangled/tangled/knotserver/jsclient"
17)
18
19func (h *Handle) StartJetstream(ctx context.Context) error {
20 collections := []string{tangled.PublicKeyNSID, tangled.KnotMemberNSID}
21 dids := []string{}
22
23 lastTimeUs, err := h.getLastTimeUs()
24 if err != nil {
25 return err
26 }
27
28 h.js = jsclient.NewJetstreamClient(collections, dids)
29 messages, err := h.js.ReadJetstream(ctx, lastTimeUs)
30 if err != nil {
31 return fmt.Errorf("failed to read from jetstream: %w", err)
32 }
33
34 go h.processMessages(messages)
35
36 return nil
37}
38
39func (h *Handle) getLastTimeUs() (int64, error) {
40 lastTimeUs, err := h.db.GetLastTimeUs()
41 if err != nil {
42 log.Println("couldn't get last time us, starting from now")
43 lastTimeUs = time.Now().UnixMicro()
44 }
45
46 // If last time is older than a week, start from now
47 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 {
48 lastTimeUs = time.Now().UnixMicro()
49 log.Printf("last time us is older than a week. discarding that and starting from now.")
50 err = h.db.SaveLastTimeUs(lastTimeUs)
51 if err != nil {
52 log.Println("failed to save last time us")
53 }
54 }
55
56 log.Printf("found last time_us %d", lastTimeUs)
57 return lastTimeUs, nil
58}
59
60func (h *Handle) processPublicKey(did string, record map[string]interface{}) {
61 if err := h.db.AddPublicKeyFromRecord(did, record); err != nil {
62 log.Printf("failed to add public key: %v", err)
63 } else {
64 log.Printf("added public key from firehose: %s", did)
65 }
66}
67
68func (h *Handle) fetchAndAddKeys(did string) {
69 resp, err := http.Get(path.Join(h.c.AppViewEndpoint, did))
70 if err != nil {
71 log.Printf("error getting keys for %s: %v", did, err)
72 return
73 }
74 defer resp.Body.Close()
75
76 plaintext, err := io.ReadAll(resp.Body)
77 if err != nil {
78 log.Printf("error reading response body: %v", err)
79 return
80 }
81
82 for _, key := range strings.Split(string(plaintext), "\n") {
83 if key == "" {
84 continue
85 }
86 pk := db.PublicKey{
87 Did: did,
88 }
89 pk.Key = key
90 if err := h.db.AddPublicKey(pk); err != nil {
91 log.Printf("failed to add public key: %v", err)
92 }
93 }
94}
95
96func (h *Handle) processKnotMember(did string, record map[string]interface{}) {
97 ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite")
98 if err != nil || !ok {
99 log.Printf("failed to add member from did %s", did)
100 return
101 }
102
103 log.Printf("adding member")
104 if err := h.e.AddMember(ThisServer, record["member"].(string)); err != nil {
105 log.Printf("failed to add member: %v", err)
106 } else {
107 log.Printf("added member from firehose: %s", record["member"])
108 }
109
110 h.fetchAndAddKeys(did)
111 h.js.UpdateDids([]string{did})
112}
113
114func (h *Handle) processMessages(messages <-chan []byte) {
115 log.Println("waiting for knot to be initialized")
116 <-h.init
117 log.Println("initalized jetstream watcher")
118
119 for msg := range messages {
120 var data map[string]interface{}
121 if err := json.Unmarshal(msg, &data); err != nil {
122 log.Printf("error unmarshaling message: %v", err)
123 continue
124 }
125
126 if kind, ok := data["kind"].(string); ok && kind == "commit" {
127 commit := data["commit"].(map[string]interface{})
128 did := data["did"].(string)
129 record := commit["record"].(map[string]interface{})
130
131 switch commit["collection"].(string) {
132 case tangled.PublicKeyNSID:
133 h.processPublicKey(did, record)
134 case tangled.KnotMemberNSID:
135 h.processKnotMember(did, record)
136 }
137
138 lastTimeUs := int64(data["time_us"].(float64))
139 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
140 log.Printf("failed to save last time us: %v", err)
141 }
142 }
143 }
144}