this repo has no description

knotserver: /init and /user endpoints

When the knot is initialized, the h.init channel is closed signalling
the Jetstream watcher to begin, with the owner DID in filter. There's no
null filter so starting it without any DIDs will give us events for all
DIDs. /init also checks if h.knotInitialized is true, and will 409
Conflict if /init is called again.

PUT /user is similar, but will only trigger a call to UpdateDids.

Changed files
+118 -10
knotserver
+1 -2
flake.nix
··· 45 45 nativeBuildInputs = [ 46 46 pkgs.go 47 47 pkgs.air 48 - pkgs.templ 49 48 pkgs.gopls 50 49 pkgs.httpie 51 50 pkgs.indigo-lexgen ··· 54 53 ]; 55 54 }; 56 55 }); 57 - apps = forAllSystems (system: 56 + apps = forAllSystems (system: 58 57 let 59 58 pkgs = nixpkgsFor."${system}"; 60 59 air-watcher = name: pkgs.writeShellScriptBin "run"
+6
knotserver/db/init.go
··· 25 25 created timestamp default current_timestamp, 26 26 unique(did, name, key) 27 27 ); 28 + create table if not exists users ( 29 + id integer primary key autoincrement, 30 + did text not null, 31 + unique(did), 32 + foreign key (did) references public_keys(did) on delete cascade 33 + ); 28 34 create table if not exists repos ( 29 35 id integer primary key autoincrement, 30 36 did text not null,
+2 -2
knotserver/db/pubkeys.go
··· 11 11 tangled.PublicKey 12 12 } 13 13 14 - func (d *DB) AddPublicKeyFromRecord(recordIface map[string]interface{}) error { 14 + func (d *DB) AddPublicKeyFromRecord(did string, recordIface map[string]interface{}) error { 15 15 record := make(map[string]string) 16 16 for k, v := range recordIface { 17 17 if str, ok := v.(string); ok { ··· 20 20 } 21 21 22 22 pk := PublicKey{ 23 - Did: record["did"], 23 + Did: did, 24 24 } 25 25 pk.Name = record["name"] 26 26 pk.Key = record["key"]
+11
knotserver/db/users.go
··· 1 + package db 2 + 3 + func (d *DB) AddUser(did string) error { 4 + _, err := d.db.Exec(`insert into users (did) values (?)`, did) 5 + return err 6 + } 7 + 8 + func (d *DB) RemoveUser(did string) error { 9 + _, err := d.db.Exec(`delete from users where did = ?`, did) 10 + return err 11 + }
+21 -4
knotserver/handler.go
··· 18 18 c *config.Config 19 19 db *db.DB 20 20 js *jsclient.JetstreamClient 21 + 22 + // init is a channel that is closed when the knot has been initailized 23 + // i.e. when the first user (knot owner) has been added. 24 + init chan struct{} 25 + knotInitialized bool 21 26 } 22 27 23 28 func Setup(ctx context.Context, c *config.Config, db *db.DB) (http.Handler, error) { 24 29 r := chi.NewRouter() 25 30 26 31 h := Handle{ 27 - c: c, 28 - db: db, 32 + c: c, 33 + db: db, 34 + init: make(chan struct{}), 29 35 } 30 36 31 37 err := h.StartJetstream(ctx) ··· 33 39 return nil, fmt.Errorf("failed to start jetstream: %w", err) 34 40 } 35 41 42 + // TODO: close this channel and set h.knotInitialized *only after* 43 + // checking if we have an owner. 44 + close(h.init) 45 + h.knotInitialized = true 46 + 36 47 r.Get("/", h.Index) 37 48 r.Route("/{did}", func(r chi.Router) { 38 49 // Repo routes ··· 63 74 }) 64 75 65 76 // Add a new user to the knot 66 - // r.With(h.VerifySignature).Put("/user", h.AddUser) 77 + r.With(h.VerifySignature).Put("/user", h.AddUser) 78 + r.With(h.VerifySignature).Post("/init", h.Init) 67 79 68 80 // Health check. Used for two-way verification with appview. 69 81 r.With(h.VerifySignature).Get("/health", h.Health) ··· 85 97 } 86 98 87 99 go func() { 100 + log.Println("waiting for knot to be initialized") 101 + <-h.init 102 + log.Println("initalized jetstream watcher") 103 + 88 104 for msg := range messages { 89 105 var data map[string]interface{} 90 106 if err := json.Unmarshal(msg, &data); err != nil { ··· 97 113 98 114 switch commit["collection"].(string) { 99 115 case tangled.PublicKeyNSID: 116 + did := data["did"].(string) 100 117 record := commit["record"].(map[string]interface{}) 101 - if err := h.db.AddPublicKeyFromRecord(record); err != nil { 118 + if err := h.db.AddPublicKeyFromRecord(did, record); err != nil { 102 119 log.Printf("failed to add public key: %v", err) 103 120 } 104 121 log.Printf("added public key from firehose: %s", data["did"])
-2
knotserver/jsclient/jetstream.go
··· 100 100 queryParams := j.buildQueryParams(cursor) 101 101 u := j.buildWebsocketURL(queryParams) 102 102 103 - log.Printf("connecting to jetstream at: %s", u.String()) 104 - 105 103 dialer := websocket.Dialer{ 106 104 HandshakeTimeout: 10 * time.Second, 107 105 }
+77
knotserver/routes.go
··· 391 391 w.WriteHeader(http.StatusNoContent) 392 392 } 393 393 394 + func (h *Handle) AddUser(w http.ResponseWriter, r *http.Request) { 395 + data := struct { 396 + DID string `json:"did"` 397 + PublicKey string `json:"pubkey"` 398 + }{} 399 + 400 + if err := json.NewDecoder(r.Body).Decode(&data); err != nil { 401 + writeError(w, "invalid request body", http.StatusBadRequest) 402 + return 403 + } 404 + 405 + did := data.DID 406 + key := data.PublicKey 407 + 408 + if err := h.db.AddUser(did); err == nil { 409 + pk := db.PublicKey{ 410 + Did: did, 411 + } 412 + pk.Key = key 413 + pk.Name = "default" 414 + err := h.db.AddPublicKey(pk) 415 + if err != nil { 416 + writeError(w, err.Error(), http.StatusInternalServerError) 417 + return 418 + } 419 + } else { 420 + writeError(w, err.Error(), http.StatusInternalServerError) 421 + return 422 + } 423 + 424 + h.js.UpdateDids([]string{did}) 425 + 426 + w.WriteHeader(http.StatusNoContent) 427 + } 428 + 429 + // TODO: make this set the initial user as the owner 430 + func (h *Handle) Init(w http.ResponseWriter, r *http.Request) { 431 + if h.knotInitialized { 432 + writeError(w, "knot already initialized", http.StatusConflict) 433 + return 434 + } 435 + 436 + data := struct { 437 + DID string `json:"did"` 438 + PublicKey string `json:"pubkey"` 439 + }{} 440 + 441 + if err := json.NewDecoder(r.Body).Decode(&data); err != nil { 442 + writeError(w, "invalid request body", http.StatusBadRequest) 443 + return 444 + } 445 + 446 + did := data.DID 447 + key := data.PublicKey 448 + 449 + if err := h.db.AddUser(did); err == nil { 450 + pk := db.PublicKey{ 451 + Did: did, 452 + } 453 + pk.Key = key 454 + pk.Name = "default" 455 + err := h.db.AddPublicKey(pk) 456 + if err != nil { 457 + writeError(w, err.Error(), http.StatusInternalServerError) 458 + return 459 + } 460 + } else { 461 + writeError(w, err.Error(), http.StatusInternalServerError) 462 + return 463 + } 464 + 465 + h.js.UpdateDids([]string{did}) 466 + // Signal that the knot is ready 467 + close(h.init) 468 + w.WriteHeader(http.StatusNoContent) 469 + } 470 + 394 471 func (h *Handle) Health(w http.ResponseWriter, r *http.Request) { 395 472 log.Println("got health check") 396 473 mac := hmac.New(sha256.New, []byte(h.c.Server.Secret))