this repo has no description
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 "net/http" 9 10 "github.com/go-chi/chi/v5" 11 tangled "github.com/sotangled/tangled/api/tangled" 12 "github.com/sotangled/tangled/knotserver/config" 13 "github.com/sotangled/tangled/knotserver/db" 14 "github.com/sotangled/tangled/knotserver/jsclient" 15 "github.com/sotangled/tangled/rbac" 16) 17 18const ( 19 ThisServer = "thisserver" // resource identifier for rbac enforcement 20) 21 22type Handle struct { 23 c *config.Config 24 db *db.DB 25 js *jsclient.JetstreamClient 26 e *rbac.Enforcer 27 28 // init is a channel that is closed when the knot has been initailized 29 // i.e. when the first user (knot owner) has been added. 30 init chan struct{} 31 knotInitialized bool 32} 33 34func Setup(ctx context.Context, c *config.Config, db *db.DB, e *rbac.Enforcer) (http.Handler, error) { 35 r := chi.NewRouter() 36 37 h := Handle{ 38 c: c, 39 db: db, 40 e: e, 41 init: make(chan struct{}), 42 } 43 44 err := e.AddDomain(ThisServer) 45 if err != nil { 46 return nil, fmt.Errorf("failed to setup enforcer: %w", err) 47 } 48 49 err = h.StartJetstream(ctx) 50 if err != nil { 51 return nil, fmt.Errorf("failed to start jetstream: %w", err) 52 } 53 54 // Check if the knot knows about any Dids; 55 // if it does, it is already initialized and we can repopulate the 56 // Jetstream subscriptions. 57 dids, err := db.GetAllDids() 58 if err != nil { 59 return nil, fmt.Errorf("failed to get all Dids: %w", err) 60 } 61 if len(dids) > 0 { 62 h.knotInitialized = true 63 close(h.init) 64 h.js.UpdateDids(dids) 65 } 66 67 r.Get("/", h.Index) 68 r.Route("/{did}", func(r chi.Router) { 69 // Repo routes 70 r.Route("/{name}", func(r chi.Router) { 71 r.Get("/", h.RepoIndex) 72 r.Get("/info/refs", h.InfoRefs) 73 r.Post("/git-upload-pack", h.UploadPack) 74 75 r.Route("/tree/{ref}", func(r chi.Router) { 76 r.Get("/*", h.RepoTree) 77 }) 78 79 r.Route("/blob/{ref}", func(r chi.Router) { 80 r.Get("/*", h.FileContent) 81 }) 82 83 r.Get("/log/{ref}", h.Log) 84 r.Get("/archive/{file}", h.Archive) 85 r.Get("/commit/{ref}", h.Diff) 86 r.Get("/refs/", h.Refs) 87 }) 88 }) 89 90 // Create a new repository. 91 r.Route("/repo", func(r chi.Router) { 92 r.Use(h.VerifySignature) 93 r.Put("/new", h.NewRepo) 94 }) 95 96 // Initialize the knot with an owner and public key. 97 r.With(h.VerifySignature).Post("/init", h.Init) 98 99 // Health check. Used for two-way verification with appview. 100 r.With(h.VerifySignature).Get("/health", h.Health) 101 102 // All public keys on the knot. 103 r.Get("/keys", h.Keys) 104 105 return r, nil 106} 107 108func (h *Handle) StartJetstream(ctx context.Context) error { 109 collections := []string{tangled.PublicKeyNSID, tangled.KnotMemberNSID} 110 dids := []string{} 111 112 h.js = jsclient.NewJetstreamClient(collections, dids) 113 messages, err := h.js.ReadJetstream(ctx) 114 if err != nil { 115 return fmt.Errorf("failed to read from jetstream: %w", err) 116 } 117 118 go func() { 119 log.Println("waiting for knot to be initialized") 120 <-h.init 121 log.Println("initalized jetstream watcher") 122 123 for msg := range messages { 124 var data map[string]interface{} 125 if err := json.Unmarshal(msg, &data); err != nil { 126 log.Printf("error unmarshaling message: %v", err) 127 continue 128 } 129 130 if kind, ok := data["kind"].(string); ok && kind == "commit" { 131 commit := data["commit"].(map[string]interface{}) 132 133 switch commit["collection"].(string) { 134 case tangled.PublicKeyNSID: 135 did := data["did"].(string) 136 record := commit["record"].(map[string]interface{}) 137 if err := h.db.AddPublicKeyFromRecord(did, record); err != nil { 138 log.Printf("failed to add public key: %v", err) 139 } else { 140 log.Printf("added public key from firehose: %s", data["did"]) 141 } 142 case tangled.KnotMemberNSID: 143 did := data["did"].(string) 144 record := commit["record"].(map[string]interface{}) 145 ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite") 146 if err != nil || !ok { 147 log.Printf("failed to add member from did %s", did) 148 } else { 149 log.Printf("adding member") 150 h.e.AddMember(ThisServer, record["member"].(string)) 151 } 152 default: 153 } 154 } 155 156 } 157 }() 158 159 return nil 160}