this repo has no description
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 "net/http" 9 10 "github.com/go-chi/chi/v5" 11 tangled "github.com/sotangled/tangled/api/tangled" 12 "github.com/sotangled/tangled/knotserver/config" 13 "github.com/sotangled/tangled/knotserver/db" 14 "github.com/sotangled/tangled/knotserver/jsclient" 15) 16 17type Handle struct { 18 c *config.Config 19 db *db.DB 20 js *jsclient.JetstreamClient 21 22 // init is a channel that is closed when the knot has been initailized 23 // i.e. when the first user (knot owner) has been added. 24 init chan struct{} 25 knotInitialized bool 26} 27 28func Setup(ctx context.Context, c *config.Config, db *db.DB) (http.Handler, error) { 29 r := chi.NewRouter() 30 31 h := Handle{ 32 c: c, 33 db: db, 34 init: make(chan struct{}), 35 } 36 37 err := h.StartJetstream(ctx) 38 if err != nil { 39 return nil, fmt.Errorf("failed to start jetstream: %w", err) 40 } 41 42 // Check if the knot knows about any DIDs; 43 // if it does, it is already initialized and we can repopulate the 44 // Jetstream subscriptions. 45 dids, err := db.GetAllDIDs() 46 if err != nil { 47 return nil, fmt.Errorf("failed to get all DIDs: %w", err) 48 } 49 if len(dids) > 0 { 50 h.knotInitialized = true 51 close(h.init) 52 h.js.UpdateDids(dids) 53 } 54 55 r.Get("/", h.Index) 56 r.Route("/{did}", func(r chi.Router) { 57 // Repo routes 58 r.Route("/{name}", func(r chi.Router) { 59 r.Get("/", h.RepoIndex) 60 r.Get("/info/refs", h.InfoRefs) 61 r.Post("/git-upload-pack", h.UploadPack) 62 63 r.Route("/tree/{ref}", func(r chi.Router) { 64 r.Get("/*", h.RepoTree) 65 }) 66 67 r.Route("/blob/{ref}", func(r chi.Router) { 68 r.Get("/*", h.FileContent) 69 }) 70 71 r.Get("/log/{ref}", h.Log) 72 r.Get("/archive/{file}", h.Archive) 73 r.Get("/commit/{ref}", h.Diff) 74 r.Get("/refs/", h.Refs) 75 }) 76 }) 77 78 // Create a new repository. 79 r.Route("/repo", func(r chi.Router) { 80 r.Use(h.VerifySignature) 81 r.Put("/new", h.NewRepo) 82 }) 83 84 // Initialize the knot with an owner and public key. 85 r.With(h.VerifySignature).Post("/init", h.Init) 86 87 // Health check. Used for two-way verification with appview. 88 r.With(h.VerifySignature).Get("/health", h.Health) 89 90 // All public keys on the knot. 91 r.Get("/keys", h.Keys) 92 93 return r, nil 94} 95 96func (h *Handle) StartJetstream(ctx context.Context) error { 97 colections := []string{tangled.PublicKeyNSID} 98 dids := []string{} 99 100 h.js = jsclient.NewJetstreamClient(colections, dids) 101 messages, err := h.js.ReadJetstream(ctx) 102 if err != nil { 103 return fmt.Errorf("failed to read from jetstream: %w", err) 104 } 105 106 go func() { 107 log.Println("waiting for knot to be initialized") 108 <-h.init 109 log.Println("initalized jetstream watcher") 110 111 for msg := range messages { 112 var data map[string]interface{} 113 if err := json.Unmarshal(msg, &data); err != nil { 114 log.Printf("error unmarshaling message: %v", err) 115 continue 116 } 117 118 if kind, ok := data["kind"].(string); ok && kind == "commit" { 119 commit := data["commit"].(map[string]interface{}) 120 121 switch commit["collection"].(string) { 122 case tangled.PublicKeyNSID: 123 did := data["did"].(string) 124 record := commit["record"].(map[string]interface{}) 125 if err := h.db.AddPublicKeyFromRecord(did, record); err != nil { 126 log.Printf("failed to add public key: %v", err) 127 } else { 128 log.Printf("added public key from firehose: %s", data["did"]) 129 } 130 default: 131 } 132 } 133 134 } 135 }() 136 137 return nil 138}