this repo has no description
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "net/http"
9 "time"
10
11 "github.com/go-chi/chi/v5"
12 tangled "github.com/sotangled/tangled/api/tangled"
13 "github.com/sotangled/tangled/knotserver/config"
14 "github.com/sotangled/tangled/knotserver/db"
15 "github.com/sotangled/tangled/knotserver/jsclient"
16 "github.com/sotangled/tangled/rbac"
17)
18
19const (
20 ThisServer = "thisserver" // resource identifier for rbac enforcement
21)
22
23type Handle struct {
24 c *config.Config
25 db *db.DB
26 js *jsclient.JetstreamClient
27 e *rbac.Enforcer
28
29 // init is a channel that is closed when the knot has been initailized
30 // i.e. when the first user (knot owner) has been added.
31 init chan struct{}
32 knotInitialized bool
33}
34
35func Setup(ctx context.Context, c *config.Config, db *db.DB, e *rbac.Enforcer) (http.Handler, error) {
36 r := chi.NewRouter()
37
38 h := Handle{
39 c: c,
40 db: db,
41 e: e,
42 init: make(chan struct{}),
43 }
44
45 err := e.AddDomain(ThisServer)
46 if err != nil {
47 return nil, fmt.Errorf("failed to setup enforcer: %w", err)
48 }
49
50 err = h.StartJetstream(ctx)
51 if err != nil {
52 return nil, fmt.Errorf("failed to start jetstream: %w", err)
53 }
54
55 // Check if the knot knows about any Dids;
56 // if it does, it is already initialized and we can repopulate the
57 // Jetstream subscriptions.
58 dids, err := db.GetAllDids()
59 if err != nil {
60 return nil, fmt.Errorf("failed to get all Dids: %w", err)
61 }
62 if len(dids) > 0 {
63 h.knotInitialized = true
64 close(h.init)
65 h.js.UpdateDids(dids)
66 }
67
68 r.Get("/", h.Index)
69 r.Route("/{did}", func(r chi.Router) {
70 // Repo routes
71 r.Route("/{name}", func(r chi.Router) {
72 r.Get("/", h.RepoIndex)
73 r.Get("/info/refs", h.InfoRefs)
74 r.Post("/git-upload-pack", h.UploadPack)
75
76 r.Route("/tree/{ref}", func(r chi.Router) {
77 r.Get("/*", h.RepoTree)
78 })
79
80 r.Route("/blob/{ref}", func(r chi.Router) {
81 r.Get("/*", h.FileContent)
82 })
83
84 r.Get("/log/{ref}", h.Log)
85 r.Get("/archive/{file}", h.Archive)
86 r.Get("/commit/{ref}", h.Diff)
87 r.Get("/refs/", h.Refs)
88 })
89 })
90
91 // Create a new repository.
92 r.Route("/repo", func(r chi.Router) {
93 r.Use(h.VerifySignature)
94 r.Put("/new", h.NewRepo)
95 })
96
97 // Initialize the knot with an owner and public key.
98 r.With(h.VerifySignature).Post("/init", h.Init)
99
100 // Health check. Used for two-way verification with appview.
101 r.With(h.VerifySignature).Get("/health", h.Health)
102
103 // All public keys on the knot.
104 r.Get("/keys", h.Keys)
105
106 return r, nil
107}
108
109func (h *Handle) StartJetstream(ctx context.Context) error {
110 collections := []string{tangled.PublicKeyNSID, tangled.KnotMemberNSID}
111 dids := []string{}
112
113 var lastTimeUs int64
114 var err error
115 lastTimeUs, err = h.db.GetLastTimeUs()
116 if err != nil {
117 log.Println("couldn't get last time us, starting from now")
118 lastTimeUs = time.Now().UnixMicro()
119 }
120 // If last time is older than a week, start from now
121 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 {
122 lastTimeUs = time.Now().UnixMicro()
123 log.Printf("last time us is older than a week. discarding that and starting from now.")
124 err = h.db.SaveLastTimeUs(lastTimeUs)
125 if err != nil {
126 log.Println("failed to save last time us")
127 }
128 }
129
130 log.Printf("found last time_us %d", lastTimeUs)
131
132 h.js = jsclient.NewJetstreamClient(collections, dids)
133 messages, err := h.js.ReadJetstream(ctx, lastTimeUs)
134 if err != nil {
135 return fmt.Errorf("failed to read from jetstream: %w", err)
136 }
137
138 go func() {
139 log.Println("waiting for knot to be initialized")
140 <-h.init
141 log.Println("initalized jetstream watcher")
142
143 for msg := range messages {
144 var data map[string]interface{}
145 if err := json.Unmarshal(msg, &data); err != nil {
146 log.Printf("error unmarshaling message: %v", err)
147 continue
148 }
149
150 if kind, ok := data["kind"].(string); ok && kind == "commit" {
151 commit := data["commit"].(map[string]interface{})
152
153 switch commit["collection"].(string) {
154 case tangled.PublicKeyNSID:
155 did := data["did"].(string)
156 record := commit["record"].(map[string]interface{})
157 if err := h.db.AddPublicKeyFromRecord(did, record); err != nil {
158 log.Printf("failed to add public key: %v", err)
159 } else {
160 log.Printf("added public key from firehose: %s", data["did"])
161 }
162 case tangled.KnotMemberNSID:
163 did := data["did"].(string)
164 record := commit["record"].(map[string]interface{})
165 ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite")
166 if err != nil || !ok {
167 log.Printf("failed to add member from did %s", did)
168 } else {
169 log.Printf("adding member")
170 if err := h.e.AddMember(ThisServer, record["member"].(string)); err != nil {
171 log.Printf("failed to add member: %v", err)
172 } else {
173 log.Printf("added member from firehose: %s", record["member"])
174 }
175 }
176 default:
177 }
178
179 lastTimeUs := int64(data["time_us"].(float64))
180 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
181 log.Printf("failed to save last time us: %v", err)
182 }
183 }
184
185 }
186 }()
187
188 return nil
189}