this repo has no description
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 "net/http" 9 "time" 10 11 "github.com/go-chi/chi/v5" 12 tangled "github.com/sotangled/tangled/api/tangled" 13 "github.com/sotangled/tangled/knotserver/config" 14 "github.com/sotangled/tangled/knotserver/db" 15 "github.com/sotangled/tangled/knotserver/jsclient" 16 "github.com/sotangled/tangled/rbac" 17) 18 19const ( 20 ThisServer = "thisserver" // resource identifier for rbac enforcement 21) 22 23type Handle struct { 24 c *config.Config 25 db *db.DB 26 js *jsclient.JetstreamClient 27 e *rbac.Enforcer 28 29 // init is a channel that is closed when the knot has been initailized 30 // i.e. when the first user (knot owner) has been added. 31 init chan struct{} 32 knotInitialized bool 33} 34 35func Setup(ctx context.Context, c *config.Config, db *db.DB, e *rbac.Enforcer) (http.Handler, error) { 36 r := chi.NewRouter() 37 38 h := Handle{ 39 c: c, 40 db: db, 41 e: e, 42 init: make(chan struct{}), 43 } 44 45 err := e.AddDomain(ThisServer) 46 if err != nil { 47 return nil, fmt.Errorf("failed to setup enforcer: %w", err) 48 } 49 50 err = h.StartJetstream(ctx) 51 if err != nil { 52 return nil, fmt.Errorf("failed to start jetstream: %w", err) 53 } 54 55 // Check if the knot knows about any Dids; 56 // if it does, it is already initialized and we can repopulate the 57 // Jetstream subscriptions. 58 dids, err := db.GetAllDids() 59 if err != nil { 60 return nil, fmt.Errorf("failed to get all Dids: %w", err) 61 } 62 if len(dids) > 0 { 63 h.knotInitialized = true 64 close(h.init) 65 h.js.UpdateDids(dids) 66 } 67 68 r.Get("/", h.Index) 69 r.Route("/{did}", func(r chi.Router) { 70 // Repo routes 71 r.Route("/{name}", func(r chi.Router) { 72 r.Get("/", h.RepoIndex) 73 r.Get("/info/refs", h.InfoRefs) 74 r.Post("/git-upload-pack", h.UploadPack) 75 76 r.Route("/tree/{ref}", func(r chi.Router) { 77 r.Get("/*", h.RepoTree) 78 }) 79 80 r.Route("/blob/{ref}", func(r chi.Router) { 81 r.Get("/*", h.FileContent) 82 }) 83 84 r.Get("/log/{ref}", h.Log) 85 r.Get("/archive/{file}", h.Archive) 86 r.Get("/commit/{ref}", h.Diff) 87 r.Get("/refs/", h.Refs) 88 }) 89 }) 90 91 // Create a new repository. 92 r.Route("/repo", func(r chi.Router) { 93 r.Use(h.VerifySignature) 94 r.Put("/new", h.NewRepo) 95 }) 96 97 // Initialize the knot with an owner and public key. 98 r.With(h.VerifySignature).Post("/init", h.Init) 99 100 // Health check. Used for two-way verification with appview. 101 r.With(h.VerifySignature).Get("/health", h.Health) 102 103 // All public keys on the knot. 104 r.Get("/keys", h.Keys) 105 106 return r, nil 107} 108 109func (h *Handle) StartJetstream(ctx context.Context) error { 110 collections := []string{tangled.PublicKeyNSID, tangled.KnotMemberNSID} 111 dids := []string{} 112 113 var lastTimeUs int64 114 var err error 115 lastTimeUs, err = h.db.GetLastTimeUs() 116 if err != nil { 117 log.Println("couldn't get last time us, starting from now") 118 lastTimeUs = time.Now().UnixMicro() 119 } 120 // If last time is older than a week, start from now 121 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 { 122 lastTimeUs = time.Now().UnixMicro() 123 log.Printf("last time us is older than a week. discarding that and starting from now.") 124 err = h.db.SaveLastTimeUs(lastTimeUs) 125 if err != nil { 126 log.Println("failed to save last time us") 127 } 128 } 129 130 log.Printf("found last time_us %d", lastTimeUs) 131 132 h.js = jsclient.NewJetstreamClient(collections, dids) 133 messages, err := h.js.ReadJetstream(ctx, lastTimeUs) 134 if err != nil { 135 return fmt.Errorf("failed to read from jetstream: %w", err) 136 } 137 138 go func() { 139 log.Println("waiting for knot to be initialized") 140 <-h.init 141 log.Println("initalized jetstream watcher") 142 143 for msg := range messages { 144 var data map[string]interface{} 145 if err := json.Unmarshal(msg, &data); err != nil { 146 log.Printf("error unmarshaling message: %v", err) 147 continue 148 } 149 150 if kind, ok := data["kind"].(string); ok && kind == "commit" { 151 commit := data["commit"].(map[string]interface{}) 152 153 switch commit["collection"].(string) { 154 case tangled.PublicKeyNSID: 155 did := data["did"].(string) 156 record := commit["record"].(map[string]interface{}) 157 if err := h.db.AddPublicKeyFromRecord(did, record); err != nil { 158 log.Printf("failed to add public key: %v", err) 159 } else { 160 log.Printf("added public key from firehose: %s", data["did"]) 161 } 162 case tangled.KnotMemberNSID: 163 did := data["did"].(string) 164 record := commit["record"].(map[string]interface{}) 165 ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite") 166 if err != nil || !ok { 167 log.Printf("failed to add member from did %s", did) 168 } else { 169 log.Printf("adding member") 170 h.e.AddMember(ThisServer, record["member"].(string)) 171 } 172 default: 173 } 174 175 lastTimeUs := int64(data["time_us"].(float64)) 176 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 177 log.Printf("failed to save last time us: %v", err) 178 } 179 } 180 181 } 182 }() 183 184 return nil 185}