this repo has no description
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "net/http"
9
10 "github.com/go-chi/chi/v5"
11 tangled "github.com/sotangled/tangled/api/tangled"
12 "github.com/sotangled/tangled/knotserver/config"
13 "github.com/sotangled/tangled/knotserver/db"
14 "github.com/sotangled/tangled/knotserver/jsclient"
15 "github.com/sotangled/tangled/rbac"
16)
17
18const (
19 ThisServer = "thisserver" // resource identifier for rbac enforcement
20)
21
22type Handle struct {
23 c *config.Config
24 db *db.DB
25 js *jsclient.JetstreamClient
26 e *rbac.Enforcer
27
28 // init is a channel that is closed when the knot has been initailized
29 // i.e. when the first user (knot owner) has been added.
30 init chan struct{}
31 knotInitialized bool
32}
33
34func Setup(ctx context.Context, c *config.Config, db *db.DB, e *rbac.Enforcer) (http.Handler, error) {
35 r := chi.NewRouter()
36
37 h := Handle{
38 c: c,
39 db: db,
40 e: e,
41 init: make(chan struct{}),
42 }
43
44 err := e.AddDomain(ThisServer)
45 if err != nil {
46 return nil, fmt.Errorf("failed to setup enforcer: %w", err)
47 }
48
49 err = h.StartJetstream(ctx)
50 if err != nil {
51 return nil, fmt.Errorf("failed to start jetstream: %w", err)
52 }
53
54 // Check if the knot knows about any Dids;
55 // if it does, it is already initialized and we can repopulate the
56 // Jetstream subscriptions.
57 dids, err := db.GetAllDids()
58 if err != nil {
59 return nil, fmt.Errorf("failed to get all Dids: %w", err)
60 }
61 if len(dids) > 0 {
62 h.knotInitialized = true
63 close(h.init)
64 h.js.UpdateDids(dids)
65 }
66
67 r.Get("/", h.Index)
68 r.Route("/{did}", func(r chi.Router) {
69 // Repo routes
70 r.Route("/{name}", func(r chi.Router) {
71 r.Get("/", h.RepoIndex)
72 r.Get("/info/refs", h.InfoRefs)
73 r.Post("/git-upload-pack", h.UploadPack)
74
75 r.Route("/tree/{ref}", func(r chi.Router) {
76 r.Get("/*", h.RepoTree)
77 })
78
79 r.Route("/blob/{ref}", func(r chi.Router) {
80 r.Get("/*", h.FileContent)
81 })
82
83 r.Get("/log/{ref}", h.Log)
84 r.Get("/archive/{file}", h.Archive)
85 r.Get("/commit/{ref}", h.Diff)
86 r.Get("/refs/", h.Refs)
87 })
88 })
89
90 // Create a new repository.
91 r.Route("/repo", func(r chi.Router) {
92 r.Use(h.VerifySignature)
93 r.Put("/new", h.NewRepo)
94 })
95
96 // Initialize the knot with an owner and public key.
97 r.With(h.VerifySignature).Post("/init", h.Init)
98
99 // Health check. Used for two-way verification with appview.
100 r.With(h.VerifySignature).Get("/health", h.Health)
101
102 // All public keys on the knot.
103 r.Get("/keys", h.Keys)
104
105 return r, nil
106}
107
108func (h *Handle) StartJetstream(ctx context.Context) error {
109 collections := []string{tangled.PublicKeyNSID, tangled.KnotMemberNSID}
110 dids := []string{}
111
112 h.js = jsclient.NewJetstreamClient(collections, dids)
113 messages, err := h.js.ReadJetstream(ctx)
114 if err != nil {
115 return fmt.Errorf("failed to read from jetstream: %w", err)
116 }
117
118 go func() {
119 log.Println("waiting for knot to be initialized")
120 <-h.init
121 log.Println("initalized jetstream watcher")
122
123 for msg := range messages {
124 var data map[string]interface{}
125 if err := json.Unmarshal(msg, &data); err != nil {
126 log.Printf("error unmarshaling message: %v", err)
127 continue
128 }
129
130 if kind, ok := data["kind"].(string); ok && kind == "commit" {
131 commit := data["commit"].(map[string]interface{})
132
133 switch commit["collection"].(string) {
134 case tangled.PublicKeyNSID:
135 did := data["did"].(string)
136 record := commit["record"].(map[string]interface{})
137 if err := h.db.AddPublicKeyFromRecord(did, record); err != nil {
138 log.Printf("failed to add public key: %v", err)
139 } else {
140 log.Printf("added public key from firehose: %s", data["did"])
141 }
142 case tangled.KnotMemberNSID:
143 did := data["did"].(string)
144 record := commit["record"].(map[string]interface{})
145 ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite")
146 if err != nil || !ok {
147 log.Printf("failed to add member from did %s", did)
148 } else {
149 log.Printf("adding member")
150 h.e.AddMember(ThisServer, record["member"].(string))
151 }
152 default:
153 }
154 }
155
156 }
157 }()
158
159 return nil
160}