this repo has no description
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "net/http"
9
10 "github.com/go-chi/chi/v5"
11 tangled "github.com/sotangled/tangled/api/tangled"
12 "github.com/sotangled/tangled/knotserver/config"
13 "github.com/sotangled/tangled/knotserver/db"
14 "github.com/sotangled/tangled/knotserver/jsclient"
15)
16
17type Handle struct {
18 c *config.Config
19 db *db.DB
20 js *jsclient.JetstreamClient
21
22 // init is a channel that is closed when the knot has been initailized
23 // i.e. when the first user (knot owner) has been added.
24 init chan struct{}
25 knotInitialized bool
26}
27
28func Setup(ctx context.Context, c *config.Config, db *db.DB) (http.Handler, error) {
29 r := chi.NewRouter()
30
31 h := Handle{
32 c: c,
33 db: db,
34 init: make(chan struct{}),
35 }
36
37 err := h.StartJetstream(ctx)
38 if err != nil {
39 return nil, fmt.Errorf("failed to start jetstream: %w", err)
40 }
41
42 // Check if the knot knows about any DIDs;
43 // if it does, it is already initialized and we can repopulate the
44 // Jetstream subscriptions.
45 dids, err := db.GetAllDIDs()
46 if err != nil {
47 return nil, fmt.Errorf("failed to get all DIDs: %w", err)
48 }
49 if len(dids) > 0 {
50 h.knotInitialized = true
51 close(h.init)
52 h.js.UpdateDids(dids)
53 }
54
55 r.Get("/", h.Index)
56 r.Route("/{did}", func(r chi.Router) {
57 // Repo routes
58 r.Route("/{name}", func(r chi.Router) {
59 r.Get("/", h.RepoIndex)
60 r.Get("/info/refs", h.InfoRefs)
61 r.Post("/git-upload-pack", h.UploadPack)
62
63 r.Route("/tree/{ref}", func(r chi.Router) {
64 r.Get("/*", h.RepoTree)
65 })
66
67 r.Route("/blob/{ref}", func(r chi.Router) {
68 r.Get("/*", h.FileContent)
69 })
70
71 r.Get("/log/{ref}", h.Log)
72 r.Get("/archive/{file}", h.Archive)
73 r.Get("/commit/{ref}", h.Diff)
74 r.Get("/refs/", h.Refs)
75 })
76 })
77
78 // Create a new repository.
79 r.Route("/repo", func(r chi.Router) {
80 r.Use(h.VerifySignature)
81 r.Put("/new", h.NewRepo)
82 })
83
84 // Initialize the knot with an owner and public key.
85 r.With(h.VerifySignature).Post("/init", h.Init)
86
87 // Health check. Used for two-way verification with appview.
88 r.With(h.VerifySignature).Get("/health", h.Health)
89
90 // All public keys on the knot.
91 r.Get("/keys", h.Keys)
92
93 return r, nil
94}
95
96func (h *Handle) StartJetstream(ctx context.Context) error {
97 colections := []string{tangled.PublicKeyNSID}
98 dids := []string{}
99
100 h.js = jsclient.NewJetstreamClient(colections, dids)
101 messages, err := h.js.ReadJetstream(ctx)
102 if err != nil {
103 return fmt.Errorf("failed to read from jetstream: %w", err)
104 }
105
106 go func() {
107 log.Println("waiting for knot to be initialized")
108 <-h.init
109 log.Println("initalized jetstream watcher")
110
111 for msg := range messages {
112 var data map[string]interface{}
113 if err := json.Unmarshal(msg, &data); err != nil {
114 log.Printf("error unmarshaling message: %v", err)
115 continue
116 }
117
118 if kind, ok := data["kind"].(string); ok && kind == "commit" {
119 commit := data["commit"].(map[string]interface{})
120
121 switch commit["collection"].(string) {
122 case tangled.PublicKeyNSID:
123 did := data["did"].(string)
124 record := commit["record"].(map[string]interface{})
125 if err := h.db.AddPublicKeyFromRecord(did, record); err != nil {
126 log.Printf("failed to add public key: %v", err)
127 } else {
128 log.Printf("added public key from firehose: %s", data["did"])
129 }
130 default:
131 }
132 }
133
134 }
135 }()
136
137 return nil
138}