Monorepo for Tangled

wip: knotmirror: introduce knotmirror

Signed-off-by: Seongmin Lee <git@boltless.me>

boltless.me fc083c44 5934e76d

verified
+2107 -38
+1
.gitignore
··· 19 19 # Created if following hacking.md 20 20 genjwks.out 21 21 /nix/vm-data 22 + /mirror
+51
cmd/knotmirror/main.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "log/slog" 6 + "os" 7 + "os/signal" 8 + "syscall" 9 + 10 + "github.com/carlmjohnson/versioninfo" 11 + "github.com/urfave/cli/v3" 12 + "tangled.org/core/knotmirror" 13 + "tangled.org/core/log" 14 + ) 15 + 16 + func main() { 17 + if err := run(os.Args); err != nil { 18 + slog.Error("error running knotmirror", "err", err) 19 + os.Exit(-1) 20 + } 21 + } 22 + 23 + func run(args []string) error { 24 + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) 25 + defer cancel() 26 + 27 + logger := log.New("knotmirror") 28 + slog.SetDefault(logger) 29 + ctx = log.IntoContext(ctx, logger) 30 + 31 + app := cli.Command{ 32 + Name: "knotmirror", 33 + Usage: "knot mirroring service", 34 + Version: versioninfo.Short(), 35 + } 36 + app.Flags = []cli.Flag{} 37 + app.Commands = []*cli.Command{ 38 + { 39 + Name: "serve", 40 + Usage: "run the knotmirror daemon", 41 + Action: runKnotMirror, 42 + Flags: []cli.Flag{}, 43 + }, 44 + } 45 + return app.Run(ctx, args) 46 + } 47 + 48 + func runKnotMirror(ctx context.Context, cmd *cli.Command) error { 49 + // TODO: generate Config from arguments & pass down to Run() 50 + return knotmirror.Run(ctx) 51 + }
+3 -1
flake.nix
··· 99 99 bluesky-jetstream = self.callPackage ./nix/pkgs/bluesky-jetstream.nix {}; 100 100 bluesky-relay = self.callPackage ./nix/pkgs/bluesky-relay.nix {}; 101 101 tap = self.callPackage ./nix/pkgs/tap.nix {}; 102 + knotmirror = self.callPackage ./nix/pkgs/knot-mirror.nix {}; 102 103 }); 103 104 in { 104 105 overlays.default = final: prev: { 105 - inherit (mkPackageSet final) lexgen goat sqlite-lib spindle knot-unwrapped knot appview docs dolly did-method-plc bluesky-jetstream bluesky-relay tap; 106 + inherit (mkPackageSet final) lexgen goat sqlite-lib spindle knot-unwrapped knot appview docs dolly did-method-plc bluesky-jetstream bluesky-relay tap knotmirror; 106 107 }; 107 108 108 109 packages = forAllSystems (system: let ··· 191 192 pkgs.coreutils # for those of us who are on systems that use busybox (alpine) 192 193 packages'.lexgen 193 194 packages'.treefmt-wrapper 195 + packages'.tap 194 196 ]; 195 197 shellHook = '' 196 198 mkdir -p appview/pages/static
+8 -7
go.mod
··· 37 37 github.com/microcosm-cc/bluemonday v1.0.27 38 38 github.com/openbao/openbao/api/v2 v2.3.0 39 39 github.com/posthog/posthog-go v1.5.5 40 + github.com/prometheus/client_golang v1.23.2 40 41 github.com/redis/go-redis/v9 v9.7.3 41 42 github.com/resend/resend-go/v2 v2.15.0 42 43 github.com/sethvargo/go-envconfig v1.1.0 43 44 github.com/srwiley/oksvg v0.0.0-20221011165216-be6e8873101c 44 45 github.com/srwiley/rasterx v0.0.0-20220730225603-2ab79fcdd4ef 45 - github.com/stretchr/testify v1.10.0 46 + github.com/stretchr/testify v1.11.1 46 47 github.com/urfave/cli/v3 v3.3.3 47 48 github.com/whyrusleeping/cbor-gen v0.3.1 48 49 github.com/yuin/goldmark v1.7.13 49 50 github.com/yuin/goldmark-emoji v1.0.6 50 51 github.com/yuin/goldmark-highlighting/v2 v2.0.0-20230729083705-37449abec8cc 51 52 gitlab.com/staticnoise/goldmark-callout v0.0.0-20240609120641-6366b799e4ab 52 - golang.org/x/crypto v0.40.0 53 + golang.org/x/crypto v0.41.0 53 54 golang.org/x/image v0.31.0 54 - golang.org/x/net v0.42.0 55 + golang.org/x/net v0.43.0 55 56 golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da 56 57 gopkg.in/yaml.v3 v3.0.1 57 58 ) ··· 178 179 github.com/pkg/errors v0.9.1 // indirect 179 180 github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect 180 181 github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f // indirect 181 - github.com/prometheus/client_golang v1.22.0 // indirect 182 182 github.com/prometheus/client_model v0.6.2 // indirect 183 - github.com/prometheus/common v0.64.0 // indirect 183 + github.com/prometheus/common v0.66.1 // indirect 184 184 github.com/prometheus/procfs v0.16.1 // indirect 185 185 github.com/rivo/uniseg v0.4.7 // indirect 186 186 github.com/ryanuber/go-glob v1.0.0 // indirect ··· 203 203 go.uber.org/atomic v1.11.0 // indirect 204 204 go.uber.org/multierr v1.11.0 // indirect 205 205 go.uber.org/zap v1.27.0 // indirect 206 + go.yaml.in/yaml/v2 v2.4.2 // indirect 206 207 golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect 207 208 golang.org/x/sync v0.17.0 // indirect 208 - golang.org/x/sys v0.34.0 // indirect 209 + golang.org/x/sys v0.35.0 // indirect 209 210 golang.org/x/text v0.29.0 // indirect 210 211 golang.org/x/time v0.12.0 // indirect 211 212 google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 // indirect 212 213 google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect 213 214 google.golang.org/grpc v1.73.0 // indirect 214 - google.golang.org/protobuf v1.36.6 // indirect 215 + google.golang.org/protobuf v1.36.8 // indirect 215 216 gopkg.in/fsnotify.v1 v1.4.7 // indirect 216 217 gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect 217 218 gopkg.in/warnings.v0 v0.1.2 // indirect
+20 -16
go.sum
··· 330 330 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= 331 331 github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= 332 332 github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= 333 + github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= 334 + github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= 333 335 github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= 334 336 github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= 335 337 github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= ··· 431 433 github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f/go.mod h1:/zvteZs/GwLtCgZ4BL6CBsk9IKIlexP43ObX9AxTqTw= 432 434 github.com/posthog/posthog-go v1.5.5 h1:2o3j7IrHbTIfxRtj4MPaXKeimuTYg49onNzNBZbwksM= 433 435 github.com/posthog/posthog-go v1.5.5/go.mod h1:3RqUmSnPuwmeVj/GYrS75wNGqcAKdpODiwc83xZWgdE= 434 - github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q= 435 - github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0= 436 + github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= 437 + github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= 436 438 github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= 437 439 github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= 438 - github.com/prometheus/common v0.64.0 h1:pdZeA+g617P7oGv1CzdTzyeShxAGrTBsolKNOLQPGO4= 439 - github.com/prometheus/common v0.64.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8= 440 + github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= 441 + github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= 440 442 github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= 441 443 github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= 442 444 github.com/redis/go-redis/v9 v9.0.0-rc.4/go.mod h1:Vo3EsyWnicKnSKCA7HhgnvnyA74wOA69Cd2Meli5mmA= ··· 482 484 github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= 483 485 github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= 484 486 github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= 485 - github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= 486 - github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= 487 + github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= 488 + github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= 487 489 github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= 488 490 github.com/urfave/cli/v3 v3.3.3 h1:byCBaVdIXuLPIDm5CYZRVG6NvT7tv1ECqdU4YzlEa3I= 489 491 github.com/urfave/cli/v3 v3.3.3/go.mod h1:FJSKtM/9AiiTOJL4fJ6TbMUkxBXn7GO9guZqoZtpYpo= ··· 554 556 go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ= 555 557 go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= 556 558 go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= 559 + go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= 560 + go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= 557 561 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= 558 562 golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= 559 563 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= ··· 561 565 golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= 562 566 golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= 563 567 golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= 564 - golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM= 565 - golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY= 568 + golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4= 569 + golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= 566 570 golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o= 567 571 golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8= 568 572 golang.org/x/image v0.31.0 h1:mLChjE2MV6g1S7oqbXC0/UcKijjm5fnJLUYKIYrLESA= ··· 597 601 golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= 598 602 golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= 599 603 golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= 600 - golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= 601 - golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= 604 + golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= 605 + golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= 602 606 golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= 603 607 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= 604 608 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= ··· 638 642 golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= 639 643 golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= 640 644 golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= 641 - golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= 642 - golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= 645 + golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= 646 + golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= 643 647 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= 644 648 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= 645 649 golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= ··· 649 653 golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= 650 654 golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= 651 655 golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= 652 - golang.org/x/term v0.33.0 h1:NuFncQrRcaRvVmgRkvM3j/F00gWIAlcmlB8ACEKmGIg= 653 - golang.org/x/term v0.33.0/go.mod h1:s18+ql9tYWp1IfpV9DmCtQDDSRBUjKaw9M1eAv5UeF0= 656 + golang.org/x/term v0.34.0 h1:O/2T7POpk0ZZ7MAzMeWFSg6S5IpWd/RXDlM9hgM3DR4= 657 + golang.org/x/term v0.34.0/go.mod h1:5jC53AEywhIVebHgPVeg0mj8OD3VO9OzclacVrqpaAw= 654 658 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= 655 659 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= 656 660 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= ··· 703 707 google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= 704 708 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= 705 709 google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= 706 - google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= 707 - google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= 710 + google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= 711 + google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= 708 712 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= 709 713 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= 710 714 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+126
knotmirror/adminpage.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "database/sql" 5 + "embed" 6 + "html/template" 7 + "log/slog" 8 + "net/http" 9 + "strconv" 10 + "time" 11 + 12 + "github.com/go-chi/chi/v5" 13 + "tangled.org/core/appview/pagination" 14 + "tangled.org/core/knotmirror/db" 15 + "tangled.org/core/knotmirror/models" 16 + "tangled.org/core/orm" 17 + ) 18 + 19 + //go:embed templates/*.html 20 + var templateFS embed.FS 21 + 22 + const repoPageSize = 20 23 + 24 + type AdminServer struct { 25 + db *sql.DB 26 + } 27 + 28 + func NewAdminServer(database *sql.DB) *AdminServer { 29 + return &AdminServer{db: database} 30 + } 31 + 32 + func (s *AdminServer) Router() http.Handler { 33 + r := chi.NewRouter() 34 + r.Get("/repos", s.handleRepos()) 35 + r.Get("/hosts", s.handleHosts()) 36 + return r 37 + } 38 + 39 + func funcmap() template.FuncMap { 40 + return template.FuncMap{ 41 + "add": func(a, b int) int { return a + b }, 42 + "sub": func(a, b int) int { return a - b }, 43 + "readt": func(ts int64) string { 44 + if ts == 0 { 45 + return "n/a" 46 + } 47 + return time.Unix(ts, 0).Format("2006-01-02 15:04") 48 + }, 49 + } 50 + } 51 + 52 + func (s *AdminServer) handleRepos() http.HandlerFunc { 53 + // TODO: prepare template 54 + tpl := template.Must(template.New("").Funcs(funcmap()).ParseFS(templateFS, "templates/base.html", "templates/repos.html")) 55 + return func(w http.ResponseWriter, r *http.Request) { 56 + pageNum, _ := strconv.Atoi(r.URL.Query().Get("page")) 57 + if pageNum < 1 { 58 + pageNum = 1 59 + } 60 + var ( 61 + did = r.URL.Query().Get("did") 62 + knot = r.URL.Query().Get("knot") 63 + state = r.URL.Query().Get("state") 64 + ) 65 + 66 + page := pagination.Page{ 67 + Offset: (pageNum - 1) * repoPageSize, 68 + Limit: repoPageSize, 69 + } 70 + var filters []orm.Filter 71 + 72 + if did != "" { 73 + filters = append(filters, orm.FilterEq("did", did)) 74 + } 75 + if knot != "" { 76 + filters = append(filters, orm.FilterEq("knot_domain", knot)) 77 + } 78 + if state != "" { 79 + filters = append(filters, orm.FilterEq("state", state)) 80 + } 81 + 82 + repos, err := db.ListRepos(r.Context(), s.db, page, filters...) 83 + if err != nil { 84 + http.Error(w, err.Error(), http.StatusInternalServerError) 85 + } 86 + counts, err := db.GetRepoCountsByState(r.Context(), s.db) 87 + if err != nil { 88 + http.Error(w, err.Error(), http.StatusInternalServerError) 89 + } 90 + err = tpl.ExecuteTemplate(w, "base", map[string]any{ 91 + "Repos": repos, 92 + "RepoCounts": counts, 93 + "Page": pageNum, 94 + "FilterByDid": did, 95 + "FilterByKnot": knot, 96 + "FilterByState": models.RepoState(state), 97 + }) 98 + if err != nil { 99 + slog.Error("failed to render", "err", err) 100 + } 101 + } 102 + } 103 + 104 + func (s *AdminServer) handleHosts() http.HandlerFunc { 105 + tpl := template.Must(template.New("").Funcs(funcmap()).ParseFS(templateFS, "templates/base.html", "templates/hosts.html")) 106 + return func(w http.ResponseWriter, r *http.Request) { 107 + var status = r.URL.Query().Get("status") 108 + 109 + var filters []orm.Filter 110 + if status != "" { 111 + filters = append(filters, orm.FilterEq("status", status)) 112 + } 113 + 114 + hosts, err := db.ListHosts(r.Context(), s.db, filters...) 115 + if err != nil { 116 + http.Error(w, err.Error(), http.StatusInternalServerError) 117 + } 118 + err = tpl.ExecuteTemplate(w, "base", map[string]any{ 119 + "Hosts": hosts, 120 + "FilterByStatus": models.HostStatus(status), 121 + }) 122 + if err != nil { 123 + slog.Error("failed to render", "err", err) 124 + } 125 + } 126 + }
+33
knotmirror/config/config.go
··· 1 + package config 2 + 3 + import ( 4 + "context" 5 + "time" 6 + 7 + "github.com/sethvargo/go-envconfig" 8 + ) 9 + 10 + type Config struct { 11 + TapUrl string `env:"MIRROR_TAP_URL, default=http://localhost:2480"` 12 + DbPath string `env:"MIRROR_DB_PATH, default=mirror.db"` 13 + KnotUseSSL bool `env:"MIRROR_KNOT_USE_SSL, default=false"` // use SSL for Knot when not schema is not specified 14 + GitRepoBasePath string `env:"MIRROR_GIT_BASEPATH, default=repos"` 15 + GitRepoFetchTimeout time.Duration `env:"MIRROR_GIT_FETCH_TIMEOUT, default=600s"` 16 + ResyncParallelism int `env:"MIRROR_RESYNC_PARALLELISM, default=5"` 17 + Slurper SlurperConfig `env:",prefix=MIRROR_SLURPER_"` 18 + MetricsListen string `env:"MIRROR_METRICS_LISTEN, default=:7100"` 19 + AdminListen string `env:"MIRROR_ADMIN_LISTEN, default=:7200"` 20 + } 21 + 22 + type SlurperConfig struct { 23 + PersistCursorPeriod time.Duration `env:"PERSIST_CURSOR_PERIOD, default=4s"` 24 + ConcurrencyPerHost int `env:"CONCURRENCY, default=40"` 25 + } 26 + 27 + func Load(ctx context.Context) (*Config, error) { 28 + var cfg Config 29 + if err := envconfig.Process(ctx, &cfg); err != nil { 30 + return nil, err 31 + } 32 + return &cfg, nil 33 + }
+25
knotmirror/crawler.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "log/slog" 7 + 8 + "tangled.org/core/log" 9 + ) 10 + 11 + type Crawler struct { 12 + logger *slog.Logger 13 + db *sql.DB 14 + } 15 + 16 + func NewCrawler(l *slog.Logger, db *sql.DB) *Crawler { 17 + return &Crawler{ 18 + logger: log.SubLogger(l, "crawler"), 19 + db: db, 20 + } 21 + } 22 + 23 + func (c *Crawler) Start(ctx context.Context) { 24 + // TODO: repository crawler 25 + }
+68
knotmirror/db/db.go
··· 1 + package db 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "fmt" 7 + "strings" 8 + _ "github.com/mattn/go-sqlite3" 9 + ) 10 + 11 + func Make(ctx context.Context, dbPath string) (*sql.DB, error) { 12 + // https://github.com/mattn/go-sqlite3#connection-string 13 + opts := []string{ 14 + "_foreign_keys=1", 15 + "_journal_mode=WAL", 16 + "_synchronous=NORMAL", 17 + "_auto_vacuum=incremental", 18 + } 19 + 20 + db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&")) 21 + if err != nil { 22 + return nil, err 23 + } 24 + 25 + conn, err := db.Conn(ctx) 26 + if err != nil { 27 + return nil, err 28 + } 29 + defer conn.Close() 30 + 31 + _, err = conn.ExecContext(ctx, ` 32 + create table if not exists repos ( 33 + did text not null, 34 + rkey text not null, 35 + at_uri text generated always as ('at://' || did || '/' || 'sh.tangled.repo' || '/' || rkey) stored, 36 + cid text not null, 37 + 38 + -- record content 39 + name text not null, 40 + knot_domain text not null, 41 + 42 + -- sync data 43 + git_rev text not null, 44 + repo_sha text not null, 45 + state text not null default 'pending', 46 + error_msg text, 47 + retry_count integer not null default 0, 48 + retry_after integer not null default 0, 49 + 50 + unique(did, rkey) 51 + ); 52 + 53 + -- knot hosts 54 + create table if not exists hosts ( 55 + hostname text not null, 56 + no_ssl integer not null default 0, 57 + status text not null default 'active', 58 + last_seq integer not null default -1, 59 + 60 + unique(hostname) 61 + ); 62 + `) 63 + if err != nil { 64 + return nil, fmt.Errorf("initializing db schema: %w", err) 65 + } 66 + 67 + return db, nil 68 + }
+116
knotmirror/db/hosts.go
··· 1 + package db 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "errors" 7 + "fmt" 8 + "log" 9 + "strings" 10 + 11 + "tangled.org/core/knotmirror/models" 12 + "tangled.org/core/orm" 13 + ) 14 + 15 + func UpsertHost(ctx context.Context, e *sql.DB, host *models.Host) error { 16 + if _, err := e.ExecContext(ctx, 17 + `insert into hosts (hostname, no_ssl, status, last_seq) 18 + values (?, ?, ?, ?) 19 + on conflict(hostname) do update set 20 + no_ssl = excluded.no_ssl, 21 + status = excluded.status, 22 + last_seq = excluded.last_seq 23 + `, 24 + host.Hostname, 25 + host.NoSSL, 26 + host.Status, 27 + host.LastSeq, 28 + ); err != nil { 29 + return fmt.Errorf("upserting host: %w", err) 30 + }; 31 + return nil 32 + } 33 + 34 + func GetHost(ctx context.Context, e *sql.DB, hostname string) (*models.Host, error) { 35 + var host models.Host 36 + if err := e.QueryRowContext(ctx, 37 + `select hostname, no_ssl, status, last_seq 38 + from hosts where hostname = ?`, 39 + hostname, 40 + ).Scan( 41 + &host.Hostname, 42 + &host.NoSSL, 43 + &host.Status, 44 + &host.LastSeq, 45 + ); err != nil { 46 + if errors.Is(err, sql.ErrNoRows) { 47 + return nil, nil 48 + } 49 + return nil, err 50 + }; 51 + return &host, nil 52 + } 53 + 54 + func StoreCursors(ctx context.Context, e *sql.DB, cursors []models.HostCursor) error { 55 + tx, err := e.BeginTx(ctx, nil) 56 + if err != nil { 57 + return fmt.Errorf("starting transaction: %w", err) 58 + } 59 + defer tx.Rollback() 60 + for _, cur := range cursors { 61 + if cur.LastSeq <= 0 { 62 + continue 63 + } 64 + if _, err := tx.ExecContext(ctx, 65 + `update hosts set last_seq = ? where hostname = ?`, 66 + cur.LastSeq, 67 + cur.Hostname, 68 + ); err != nil { 69 + log.Println("failed to persist host cursor", "host:", cur.Hostname, "lastSeq", cur.LastSeq) 70 + } 71 + } 72 + return tx.Commit() 73 + } 74 + 75 + func ListHosts(ctx context.Context, e *sql.DB, filters ...orm.Filter) ([]models.Host, error) { 76 + var conditions []string 77 + var args []any 78 + 79 + for _, filter := range filters { 80 + conditions = append(conditions, filter.Condition()) 81 + args = append(args, filter.Arg()...) 82 + } 83 + 84 + whereClause := "" 85 + if len(conditions) > 0 { 86 + whereClause = " where " + strings.Join(conditions, " and ") 87 + } 88 + 89 + rows, err := e.QueryContext(ctx, 90 + `select hostname, no_ssl, status, last_seq 91 + from hosts` + whereClause, 92 + args..., 93 + ) 94 + if err != nil { 95 + return nil, fmt.Errorf("querying hosts: %w", err) 96 + } 97 + defer rows.Close() 98 + 99 + var hosts []models.Host 100 + for rows.Next() { 101 + var host models.Host 102 + if err := rows.Scan( 103 + &host.Hostname, 104 + &host.NoSSL, 105 + &host.Status, 106 + &host.LastSeq, 107 + ); err != nil { 108 + return nil, fmt.Errorf("scanning row: %w", err) 109 + } 110 + hosts = append(hosts, host) 111 + } 112 + if err := rows.Err(); err != nil { 113 + return nil, fmt.Errorf("scanning rows: %w ", err) 114 + } 115 + return hosts, nil 116 + }
+265
knotmirror/db/repos.go
··· 1 + package db 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "errors" 7 + "fmt" 8 + 9 + "github.com/bluesky-social/indigo/atproto/syntax" 10 + "tangled.org/core/appview/pagination" 11 + "tangled.org/core/knotmirror/models" 12 + "tangled.org/core/orm" 13 + ) 14 + 15 + func AddRepo(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, cid syntax.CID, name, knot string) error { 16 + if _, err := e.ExecContext(ctx, 17 + `insert into repos (did, rkey, cid, name, knot_domain) 18 + values (?, ?, ?, ?, ?)`, 19 + did, rkey, cid, name, knot, 20 + ); err != nil { 21 + return fmt.Errorf("inserting repo: %w", err) 22 + } 23 + return nil 24 + } 25 + 26 + func UpsertRepo(ctx context.Context, e *sql.DB, repo *models.Repo) error { 27 + if _, err := e.ExecContext(ctx, 28 + `insert into repos (did, rkey, cid, name, knot_domain, git_rev, repo_sha, state, error_msg, retry_count, retry_after) 29 + values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 30 + on conflict(did, rkey) do update set 31 + cid = excluded.cid, 32 + name = excluded.name, 33 + knot_domain = excluded.knot_domain, 34 + git_rev = excluded.git_rev, 35 + repo_sha = excluded.repo_sha, 36 + state = excluded.state, 37 + error_msg = excluded.error_msg, 38 + retry_count = excluded.retry_count, 39 + retry_after = excluded.retry_after`, 40 + // where repos.cid != excluded.cid`, 41 + repo.Did, 42 + repo.Rkey, 43 + repo.Cid, 44 + repo.Name, 45 + repo.KnotDomain, 46 + repo.GitRev, 47 + repo.RepoSha, 48 + repo.State, 49 + repo.ErrorMsg, 50 + repo.RetryCount, 51 + repo.RetryAfter, 52 + ); err != nil { 53 + return fmt.Errorf("upserting repo: %w", err) 54 + } 55 + return nil 56 + } 57 + 58 + func UpdateRepoState(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, state models.RepoState) error { 59 + if _, err := e.ExecContext(ctx, 60 + `update repos 61 + set state = ? 62 + where did = ? and rkey = ?`, 63 + state, 64 + did, rkey, 65 + ); err != nil { 66 + return fmt.Errorf("updating repo: %w", err) 67 + } 68 + return nil 69 + } 70 + 71 + func DeleteRepo(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey) error { 72 + if _, err := e.ExecContext(ctx, 73 + `delete from repos where did = ? and rkey = ?`, 74 + did, 75 + rkey, 76 + ); err != nil { 77 + return fmt.Errorf("deleting repo: %w", err) 78 + } 79 + return nil 80 + } 81 + 82 + func GetRepoByName(ctx context.Context, e *sql.DB, did syntax.DID, name string) (*models.Repo, error) { 83 + var repo models.Repo 84 + if err := e.QueryRowContext(ctx, 85 + `select 86 + did, 87 + rkey, 88 + cid, 89 + name, 90 + knot_domain, 91 + git_rev, 92 + repo_sha, 93 + state, 94 + error_msg, 95 + retry_count, 96 + retry_after 97 + from repos 98 + where did = ? and name = ?`, 99 + did, 100 + name, 101 + ).Scan( 102 + &repo.Did, 103 + &repo.Rkey, 104 + &repo.Cid, 105 + &repo.Name, 106 + &repo.KnotDomain, 107 + &repo.GitRev, 108 + &repo.RepoSha, 109 + &repo.State, 110 + &repo.ErrorMsg, 111 + &repo.RetryCount, 112 + &repo.RetryAfter, 113 + ); err != nil { 114 + return nil, fmt.Errorf("querying repo: %w", err) 115 + } 116 + return &repo, nil 117 + } 118 + 119 + func GetRepoByAtUri(ctx context.Context, e *sql.DB, aturi syntax.ATURI) (*models.Repo, error) { 120 + var repo models.Repo 121 + if err := e.QueryRowContext(ctx, 122 + `select 123 + did, 124 + rkey, 125 + cid, 126 + name, 127 + knot_domain, 128 + git_rev, 129 + repo_sha, 130 + state, 131 + error_msg, 132 + retry_count, 133 + retry_after 134 + from repos 135 + where at_uri = ?`, 136 + aturi, 137 + ).Scan( 138 + &repo.Did, 139 + &repo.Rkey, 140 + &repo.Cid, 141 + &repo.Name, 142 + &repo.KnotDomain, 143 + &repo.GitRev, 144 + &repo.RepoSha, 145 + &repo.State, 146 + &repo.ErrorMsg, 147 + &repo.RetryCount, 148 + &repo.RetryAfter, 149 + ); err != nil { 150 + if errors.Is(err, sql.ErrNoRows) { 151 + return nil, nil 152 + } 153 + return nil, fmt.Errorf("querying repo: %w", err) 154 + } 155 + return &repo, nil 156 + } 157 + 158 + func ListRepos(ctx context.Context, e *sql.DB, page pagination.Page, filters ...orm.Filter) ([]models.Repo, error) { 159 + var conditions []string 160 + var args []any 161 + 162 + for _, filter := range filters { 163 + conditions = append(conditions, filter.Condition()) 164 + args = append(args, filter.Arg()...) 165 + } 166 + 167 + whereClause := "" 168 + if len(conditions) > 0 { 169 + whereClause = "WHERE " + conditions[0] 170 + for _, condition := range conditions[1:] { 171 + whereClause += " AND " + condition 172 + } 173 + } 174 + pageClause := "" 175 + if page.Limit > 0 { 176 + pageClause = " limit ? offset ? " 177 + args = append(args, page.Limit, page.Offset) 178 + } 179 + 180 + query := ` 181 + select 182 + did, 183 + rkey, 184 + cid, 185 + name, 186 + knot_domain, 187 + git_rev, 188 + repo_sha, 189 + state, 190 + error_msg, 191 + retry_count, 192 + retry_after 193 + from repos 194 + ` + whereClause + pageClause 195 + rows, err := e.QueryContext(ctx, query, args...) 196 + if err != nil { 197 + return nil, err 198 + } 199 + defer rows.Close() 200 + 201 + var repos []models.Repo 202 + for rows.Next() { 203 + var repo models.Repo 204 + if err := rows.Scan( 205 + &repo.Did, 206 + &repo.Rkey, 207 + &repo.Cid, 208 + &repo.Name, 209 + &repo.KnotDomain, 210 + &repo.GitRev, 211 + &repo.RepoSha, 212 + &repo.State, 213 + &repo.ErrorMsg, 214 + &repo.RetryCount, 215 + &repo.RetryAfter, 216 + ); err != nil { 217 + return nil, fmt.Errorf("scanning row: %w", err) 218 + } 219 + repos = append(repos, repo) 220 + } 221 + if err := rows.Err(); err != nil { 222 + return nil, fmt.Errorf("scanning rows: %w ", err) 223 + } 224 + 225 + return repos, nil 226 + } 227 + 228 + func GetRepoCountsByState(ctx context.Context, e *sql.DB) (map[models.RepoState]int64, error) { 229 + const q = ` 230 + SELECT state, COUNT(*) 231 + FROM repos 232 + GROUP BY state 233 + ` 234 + 235 + rows, err := e.QueryContext(ctx, q) 236 + if err != nil { 237 + return nil, err 238 + } 239 + defer rows.Close() 240 + 241 + counts := make(map[models.RepoState]int64) 242 + 243 + for rows.Next() { 244 + var state string 245 + var count int64 246 + 247 + if err := rows.Scan(&state, &count); err != nil { 248 + return nil, err 249 + } 250 + 251 + counts[models.RepoState(state)] = count 252 + } 253 + 254 + if err := rows.Err(); err != nil { 255 + return nil, err 256 + } 257 + 258 + for _, s := range (models.RepoState("")).AllStates() { 259 + if _, ok := counts[s]; !ok { 260 + counts[s] = 0 261 + } 262 + } 263 + 264 + return counts, nil 265 + }
+104
knotmirror/knotmirror.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "net/http" 7 + "time" 8 + 9 + "github.com/prometheus/client_golang/prometheus/promhttp" 10 + "tangled.org/core/knotmirror/config" 11 + "tangled.org/core/knotmirror/db" 12 + "tangled.org/core/knotmirror/knotstream" 13 + "tangled.org/core/log" 14 + ) 15 + 16 + func Run(ctx context.Context) error { 17 + // make sure every services are cleaned up on fast return 18 + ctx, cancel := context.WithCancel(ctx) 19 + defer cancel() 20 + 21 + logger := log.FromContext(ctx) 22 + cfg, err := config.Load(ctx) 23 + if err != nil { 24 + return fmt.Errorf("loading config: %w", err) 25 + } 26 + 27 + logger.Debug("config loaded:", "config", cfg) 28 + 29 + db, err := db.Make(ctx, cfg.DbPath) 30 + if err != nil { 31 + return fmt.Errorf("initializing db: %w", err) 32 + } 33 + 34 + knotstream := knotstream.NewKnotStream(logger, db, cfg) 35 + crawler := NewCrawler(logger, db) 36 + resyncer := NewResyncer(logger, db, cfg) 37 + adminpage := NewAdminServer(db) 38 + 39 + // maintain repository list with tap 40 + // NOTE: this can be removed once we introduce did-for-repo because then we can just listen to KnotStream for #identity events. 41 + tap := NewTapClient(logger, cfg, db, knotstream) 42 + 43 + // start metrics endpoint 44 + go func() { 45 + metricsAddr := cfg.MetricsListen 46 + logger.Info("starting metrics server", "addr", metricsAddr) 47 + http.Handle("/metrics", promhttp.Handler()) 48 + if err := http.ListenAndServe(metricsAddr, nil); err != nil { 49 + logger.Error("metrics server failed", "error", err) 50 + } 51 + }() 52 + 53 + // start admin page endpoint 54 + go func() { 55 + logger.Info("starting admin server", "addr", cfg.AdminListen) 56 + if err := http.ListenAndServe(cfg.AdminListen, adminpage.Router()); err != nil { 57 + logger.Error("admin server failed", "error", err) 58 + } 59 + }() 60 + 61 + tap.Start(ctx) 62 + 63 + resyncer.Start(ctx) 64 + 65 + // periodically crawl the entire network to mirror the repositories 66 + crawler.Start(ctx) 67 + 68 + // listen to knotstream (currently we don't have relay for knots, so subscribe every known knots) 69 + knotstream.Start(ctx) 70 + 71 + svcErr := make(chan error, 1) 72 + if err := knotstream.ResubscribeAllHosts(ctx); err != nil { 73 + svcErr <- fmt.Errorf("resubscribing known hosts: %w", err) 74 + } 75 + 76 + logger.Info("startup complete") 77 + select { 78 + case <-ctx.Done(): 79 + logger.Info("received shutdown signal", "reason", ctx.Err()) 80 + case err := <-svcErr: 81 + if err != nil { 82 + logger.Error("service error", "error", err) 83 + } 84 + cancel() 85 + } 86 + 87 + logger.Info("shutting down knotmirror") 88 + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) 89 + defer shutdownCancel() 90 + 91 + var errs []error 92 + if err := knotstream.Shutdown(shutdownCtx); err != nil { 93 + errs = append(errs, err) 94 + } 95 + if err := db.Close(); err != nil { 96 + errs = append(errs, err) 97 + } 98 + for _, err := range errs { 99 + logger.Error("error during shutdown", "err", err) 100 + } 101 + 102 + logger.Info("shutdown complete") 103 + return nil 104 + }
+88
knotmirror/knotstream/knotstream.go
··· 1 + package knotstream 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "fmt" 7 + "log/slog" 8 + "time" 9 + 10 + "tangled.org/core/knotmirror/config" 11 + "tangled.org/core/knotmirror/db" 12 + "tangled.org/core/knotmirror/models" 13 + "tangled.org/core/log" 14 + "tangled.org/core/orm" 15 + ) 16 + 17 + type KnotStream struct { 18 + logger *slog.Logger 19 + db *sql.DB 20 + slurper *KnotSlurper 21 + } 22 + 23 + func NewKnotStream(l *slog.Logger, db *sql.DB, cfg *config.Config) *KnotStream { 24 + l = log.SubLogger(l, "knotstream") 25 + return &KnotStream{ 26 + logger: l, 27 + db: db, 28 + slurper: NewKnotSlurper(l, db, cfg.Slurper), 29 + } 30 + } 31 + 32 + func (s *KnotStream) Start(ctx context.Context) { 33 + go s.slurper.Run(ctx) 34 + } 35 + 36 + func (s *KnotStream) Shutdown(ctx context.Context) error { 37 + return s.slurper.Shutdown(ctx) 38 + } 39 + 40 + func (s *KnotStream) CheckIfSubscribed(hostname string) bool { 41 + return s.slurper.CheckIfSubscribed(hostname) 42 + } 43 + 44 + func (s *KnotStream) SubscribeHost(ctx context.Context, hostname string, noSSL bool) error { 45 + s.logger.Debug("subscribe", "nossl", noSSL) 46 + host, err := db.GetHost(ctx, s.db, hostname) 47 + if err != nil { 48 + return fmt.Errorf("loading host from db: %w", err) 49 + } 50 + 51 + if host == nil { 52 + host = &models.Host{ 53 + Hostname: hostname, 54 + NoSSL: noSSL, 55 + Status: models.HostStatusActive, 56 + LastSeq: 0, 57 + } 58 + 59 + if err := db.UpsertHost(ctx, s.db, host); err != nil { 60 + return fmt.Errorf("adding host to db: %w", err) 61 + } 62 + 63 + s.logger.Info("adding new host subscription", "hostname", hostname, "noSSL", noSSL) 64 + } 65 + 66 + if host.Status == models.HostStatusBanned { 67 + return fmt.Errorf("cannot subscribe to banned knot") 68 + } 69 + return s.slurper.Subscribe(ctx, *host) 70 + } 71 + 72 + func (s *KnotStream) ResubscribeAllHosts(ctx context.Context) error { 73 + hosts, err := db.ListHosts(ctx, s.db, orm.FilterEq("status", "active")) 74 + if err != nil { 75 + return fmt.Errorf("listing hosts: %w", err) 76 + } 77 + 78 + for _, host := range hosts { 79 + l := s.logger.With("hostname", host.Hostname) 80 + l.Info("re-subscribing to active host") 81 + if err := s.slurper.Subscribe(ctx, host); err != nil { 82 + l.Warn("failed to re-subscribe to host", "err", err) 83 + } 84 + // sleep for a very short period, so we don't open tons of sockets at the same time 85 + time.Sleep(1 * time.Millisecond) 86 + } 87 + return nil 88 + }
+28
knotmirror/knotstream/metrics.go
··· 1 + package knotstream 2 + 3 + import ( 4 + "github.com/prometheus/client_golang/prometheus" 5 + "github.com/prometheus/client_golang/prometheus/promauto" 6 + ) 7 + 8 + // KnotStream metrics 9 + var ( 10 + knotstreamEventsReceived = promauto.NewCounter(prometheus.CounterOpts{ 11 + Name: "knotmirror_knotstream_events_received_total", 12 + Help: "Total number of events received from knotstream", 13 + }) 14 + knotstreamEventsProcessed = promauto.NewCounter(prometheus.CounterOpts{ 15 + Name: "knotmirror_knotstream_events_processed_total", 16 + Help: "Total number of events successfully processed", 17 + }) 18 + knotstreamEventsSkipped = promauto.NewCounter(prometheus.CounterOpts{ 19 + Name: "knotmirror_knotstream_events_skipped_total", 20 + Help: "Total number of events skipped (not tracked)", 21 + }) 22 + ) 23 + 24 + // slurper metrics 25 + var connectedInbound = promauto.NewGauge(prometheus.GaugeOpts{ 26 + Name: "knotmirror_connected_inbound", 27 + Help: "Number of inbound knotstream we are consuming", 28 + })
+102
knotmirror/knotstream/scheduler.go
··· 1 + package knotstream 2 + 3 + import ( 4 + "context" 5 + "log/slog" 6 + "sync" 7 + "sync/atomic" 8 + "time" 9 + 10 + "tangled.org/core/log" 11 + ) 12 + 13 + type ParallelScheduler struct { 14 + concurrency int 15 + 16 + do func(ctx context.Context, task *Task) error 17 + 18 + feeder chan *Task 19 + lk sync.Mutex 20 + scheduled map[string][]*Task 21 + lastSeq atomic.Int64 22 + 23 + logger *slog.Logger 24 + } 25 + 26 + type Task struct { 27 + key string 28 + message []byte 29 + } 30 + 31 + func NewParallelScheduler(maxC int, ident string, do func(context.Context, *Task) error) *ParallelScheduler { 32 + return &ParallelScheduler{ 33 + concurrency: maxC, 34 + do: do, 35 + feeder: make(chan *Task), 36 + scheduled: make(map[string][]*Task), 37 + logger: log.New("parallel-scheduler"), 38 + } 39 + } 40 + 41 + func (s *ParallelScheduler) Start(ctx context.Context) { 42 + for range s.concurrency { 43 + go s.ForEach(ctx, s.do) 44 + } 45 + } 46 + 47 + func (s *ParallelScheduler) AddTask(ctx context.Context, task *Task) { 48 + s.lk.Lock() 49 + if st, ok := s.scheduled[task.key]; ok { 50 + // schedule task 51 + s.scheduled[task.key] = append(st, task) 52 + s.lk.Unlock() 53 + return 54 + } 55 + s.scheduled[task.key] = []*Task{} 56 + s.lk.Unlock() 57 + 58 + select { 59 + case <-ctx.Done(): 60 + return 61 + case s.feeder <- task: 62 + return 63 + } 64 + } 65 + 66 + func (s *ParallelScheduler) ForEach(ctx context.Context, fn func(context.Context, *Task) error) { 67 + for task := range s.feeder { 68 + for task != nil { 69 + select { 70 + case <-ctx.Done(): 71 + return 72 + default: 73 + } 74 + if err := fn(ctx, task); err != nil { 75 + s.logger.Error("event handler failed", "err", err) 76 + } 77 + 78 + s.lk.Lock() 79 + func() { 80 + rem, ok := s.scheduled[task.key] 81 + if !ok { 82 + s.logger.Error("should always have an 'active' entry if a worker is processing a job") 83 + } 84 + if len(rem) == 0 { 85 + delete(s.scheduled, task.key) 86 + task = nil 87 + } else { 88 + task = rem[0] 89 + s.scheduled[task.key] = rem[1:] 90 + } 91 + 92 + // TODO: update seq from received message 93 + s.lastSeq.Store(time.Now().UnixNano()) 94 + }() 95 + s.lk.Unlock() 96 + } 97 + } 98 + } 99 + 100 + func (s *ParallelScheduler) LastSeq() int64 { 101 + return s.lastSeq.Load() 102 + }
+334
knotmirror/knotstream/slurper.go
··· 1 + package knotstream 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "encoding/json" 7 + "fmt" 8 + "log/slog" 9 + "math/rand" 10 + "net/http" 11 + "sync" 12 + "time" 13 + 14 + "github.com/bluesky-social/indigo/atproto/syntax" 15 + "github.com/bluesky-social/indigo/util/ssrf" 16 + "github.com/carlmjohnson/versioninfo" 17 + "github.com/gorilla/websocket" 18 + "tangled.org/core/api/tangled" 19 + "tangled.org/core/knotmirror/config" 20 + "tangled.org/core/knotmirror/db" 21 + "tangled.org/core/knotmirror/models" 22 + "tangled.org/core/log" 23 + ) 24 + 25 + type KnotSlurper struct { 26 + logger *slog.Logger 27 + db *sql.DB 28 + cfg config.SlurperConfig 29 + 30 + subsLk sync.Mutex 31 + subs map[string]*subscription 32 + } 33 + 34 + func NewKnotSlurper(l *slog.Logger, db *sql.DB, cfg config.SlurperConfig) *KnotSlurper { 35 + return &KnotSlurper{ 36 + logger: log.SubLogger(l, "slurper"), 37 + db: db, 38 + cfg: cfg, 39 + subs: make(map[string]*subscription), 40 + } 41 + } 42 + 43 + func (s *KnotSlurper) Run(ctx context.Context) { 44 + for { 45 + select { 46 + case <-ctx.Done(): 47 + return 48 + case <-time.After(s.cfg.PersistCursorPeriod): 49 + if err := s.persistCursors(ctx); err != nil { 50 + s.logger.Error("failed to flush cursors", "err", err) 51 + } 52 + } 53 + } 54 + } 55 + 56 + func (s *KnotSlurper) CheckIfSubscribed(hostname string) bool { 57 + s.subsLk.Lock() 58 + defer s.subsLk.Unlock() 59 + 60 + _, ok := s.subs[hostname] 61 + return ok 62 + } 63 + 64 + func (s *KnotSlurper) Shutdown(ctx context.Context) error { 65 + s.logger.Info("starting shutdown host cursor flush") 66 + err := s.persistCursors(ctx) 67 + if err != nil { 68 + s.logger.Error("shutdown error", "err", err) 69 + } 70 + s.logger.Info("slurper shutdown complete") 71 + return err 72 + } 73 + 74 + func (s *KnotSlurper) persistCursors(ctx context.Context) error { 75 + // // gather cursor list from subscriptions and store them to DB 76 + // start := time.Now() 77 + 78 + s.subsLk.Lock() 79 + cursors := make([]models.HostCursor, len(s.subs)) 80 + i := 0 81 + for _, sub := range s.subs { 82 + cursors[i] = sub.HostCursor() 83 + i++ 84 + } 85 + s.subsLk.Unlock() 86 + 87 + err := db.StoreCursors(ctx, s.db, cursors) 88 + // s.logger.Info("finished persisting cursors", "count", len(cursors), "duration", time.Since(start).String(), "err", err) 89 + return err 90 + } 91 + 92 + func (s *KnotSlurper) Subscribe(ctx context.Context, host models.Host) error { 93 + s.subsLk.Lock() 94 + defer s.subsLk.Unlock() 95 + 96 + _, ok := s.subs[host.Hostname] 97 + if ok { 98 + return fmt.Errorf("already subscribed: %s", host.Hostname) 99 + } 100 + 101 + // TODO: include `cancel` function to kill subscription by hostname 102 + sub := &subscription{ 103 + hostname: host.Hostname, 104 + scheduler: NewParallelScheduler( 105 + s.cfg.ConcurrencyPerHost, 106 + host.Hostname, 107 + s.ProcessEvent, 108 + ), 109 + } 110 + s.subs[host.Hostname] = sub 111 + 112 + sub.scheduler.Start(ctx) 113 + go s.subscribeWithRedialer(ctx, host, sub) 114 + return nil 115 + } 116 + 117 + func (s *KnotSlurper) subscribeWithRedialer(ctx context.Context, host models.Host, sub *subscription) { 118 + l := s.logger.With("host", host.Hostname) 119 + 120 + dialer := websocket.Dialer{ 121 + HandshakeTimeout: time.Second * 5, 122 + } 123 + 124 + // if this isn't a localhost / private connection, then we should enable SSRF protections 125 + if !host.NoSSL { 126 + netDialer := ssrf.PublicOnlyDialer() 127 + dialer.NetDialContext = netDialer.DialContext 128 + } 129 + 130 + cursor := host.LastSeq 131 + 132 + connectedInbound.Inc() 133 + defer connectedInbound.Dec() 134 + 135 + var backoff int 136 + for { 137 + select { 138 + case <-ctx.Done(): 139 + return 140 + default: 141 + } 142 + u := host.LegacyEventsURL(cursor) 143 + l.Debug("made url with cursor", "cursor", cursor, "url", u) 144 + 145 + // NOTE: manual backoff retry implementation to explicitly handle fails 146 + hdr := make(http.Header) 147 + hdr.Add("User-Agent", userAgent()) 148 + conn, resp, err := dialer.DialContext(ctx, u, hdr) 149 + if err != nil { 150 + l.Warn("dialing failed", "err", err, "backoff", backoff) 151 + time.Sleep(sleepForBackoff(backoff)) 152 + backoff++ 153 + if backoff > 15 { 154 + l.Warn("host does not appear to be online, disabling for now") 155 + host.Status = models.HostStatusOffline 156 + if err := db.UpsertHost(ctx, s.db, &host); err != nil { 157 + l.Error("failed to update host status", "err", err) 158 + } 159 + return 160 + } 161 + continue 162 + } 163 + 164 + l.Debug("knot event subscription response", "code", resp.StatusCode, "url", u) 165 + 166 + if err := s.handleConnection(ctx, conn, sub); err != nil { 167 + // TODO: measure the last N connection error times and if they're coming too fast reconnect slower or don't reconnect and wait for requestCrawl 168 + l.Warn("host connection failed", "err", err, "backoff", backoff) 169 + } 170 + 171 + updatedCursor := sub.LastSeq() 172 + didProgress := updatedCursor > cursor 173 + l.Debug("cursor compare", "cursor", cursor, "updatedCursor", updatedCursor, "didProgress", didProgress) 174 + if cursor == 0 || didProgress { 175 + cursor = updatedCursor 176 + backoff = 0 177 + 178 + batch := []models.HostCursor{sub.HostCursor()} 179 + if err := db.StoreCursors(ctx, s.db, batch); err != nil { 180 + l.Error("failed to store cursors", "err", err) 181 + } 182 + } 183 + } 184 + } 185 + 186 + // handleConnection handles websocket connection. 187 + // Schedules task from received event and return when connection is closed 188 + func (s *KnotSlurper) handleConnection(ctx context.Context, conn *websocket.Conn, sub *subscription) error { 189 + // ping on every 30s 190 + ctx, cancel := context.WithCancel(ctx) 191 + defer cancel() // close the background ping job on connection close 192 + go func() { 193 + t := time.NewTicker(30 * time.Second) 194 + defer t.Stop() 195 + failcount := 0 196 + 197 + for { 198 + select { 199 + case <-t.C: 200 + if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second*10)); err != nil { 201 + s.logger.Warn("failed to ping", "err", err) 202 + failcount++ 203 + if failcount >= 4 { 204 + s.logger.Error("too many ping fails", "count", failcount) 205 + _ = conn.Close() 206 + return 207 + } 208 + } else { 209 + failcount = 0 // ok ping 210 + } 211 + case <-ctx.Done(): 212 + _ = conn.Close() 213 + return 214 + } 215 + } 216 + }() 217 + 218 + conn.SetPingHandler(func(message string) error { 219 + err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Minute)) 220 + if err == websocket.ErrCloseSent { 221 + return nil 222 + } 223 + return err 224 + }) 225 + conn.SetPongHandler(func(_ string) error { 226 + if err := conn.SetReadDeadline(time.Now().Add(time.Minute)); err != nil { 227 + s.logger.Error("failed to set read deadline", "err", err) 228 + } 229 + return nil 230 + }) 231 + 232 + for { 233 + select { 234 + case <-ctx.Done(): 235 + return ctx.Err() 236 + default: 237 + } 238 + msgType, msg, err := conn.ReadMessage() 239 + if err != nil { 240 + return err 241 + } 242 + 243 + if msgType != websocket.TextMessage { 244 + continue 245 + } 246 + 247 + sub.scheduler.AddTask(ctx, &Task{ 248 + key: sub.hostname, // TODO: replace to repository AT-URI for better concurrency 249 + message: msg, 250 + }) 251 + } 252 + } 253 + 254 + type LegacyGitEvent struct { 255 + Rkey string 256 + Nsid string 257 + Event tangled.GitRefUpdate 258 + } 259 + 260 + func (s *KnotSlurper) ProcessEvent(ctx context.Context, task *Task) error { 261 + var legacyMessage LegacyGitEvent 262 + if err := json.Unmarshal(task.message, &legacyMessage); err != nil { 263 + return fmt.Errorf("unmarshaling message: %w", err) 264 + } 265 + 266 + if err := s.ProcessLegacyGitRefUpdate(ctx, &legacyMessage); err != nil { 267 + return fmt.Errorf("processing gitRefUpdate: %w", err) 268 + } 269 + return nil 270 + } 271 + 272 + func (s *KnotSlurper) ProcessLegacyGitRefUpdate(ctx context.Context, evt *LegacyGitEvent) error { 273 + knotstreamEventsReceived.Inc() 274 + 275 + curr, err := db.GetRepoByName(ctx, s.db, syntax.DID(evt.Event.RepoDid), evt.Event.RepoName) 276 + if err != nil { 277 + return fmt.Errorf("failed to get repo '%s': %w", evt.Event.RepoDid+"/"+evt.Event.RepoName, err) 278 + } 279 + if curr == nil { 280 + // if repo doesn't exist in DB, just ignore the event. That repo is unknown. 281 + // 282 + // Normally did+name is already enough to perform git-fetch as that's 283 + // what needed to fetch the repository. 284 + // But we want to store that in did/rkey in knot-mirror. 285 + // Therefore, we should ignore when the repository is unknown. 286 + // Hopefully crawler will sync it later. 287 + s.logger.Warn("skipping event from unknown repo", "did/repo", evt.Event.RepoDid+"/"+evt.Event.RepoName) 288 + knotstreamEventsSkipped.Inc() 289 + return nil 290 + } 291 + l := s.logger.With("repoAt", curr.AtUri()) 292 + 293 + // TODO: should plan resync to resyncBuffer on RepoStateResyncing 294 + if curr.State != models.RepoStateActive { 295 + l.Debug("skipping non-active repo") 296 + knotstreamEventsSkipped.Inc() 297 + return nil 298 + } 299 + 300 + if curr.GitRev != "" && evt.Rkey <= curr.GitRev.String() { 301 + l.Debug("skipping replayed event", "event.Rkey", evt.Rkey, "currentRev", curr.GitRev) 302 + knotstreamEventsSkipped.Inc() 303 + return nil 304 + } 305 + 306 + // if curr.State == models.RepoStateResyncing { 307 + // firehoseEventsSkipped.Inc() 308 + // return fp.events.addToResyncBuffer(ctx, commit) 309 + // } 310 + 311 + // can't skip anything, update repo state 312 + if err := db.UpdateRepoState(ctx, s.db, curr.Did, curr.Rkey, models.RepoStateDesynchronized); err != nil { 313 + return err 314 + } 315 + 316 + l.Info("event processed", "eventRev", evt.Rkey) 317 + 318 + knotstreamEventsProcessed.Inc() 319 + return nil 320 + } 321 + 322 + func userAgent() string { 323 + return fmt.Sprintf("knotmirror/%s", versioninfo.Short()) 324 + } 325 + 326 + func sleepForBackoff(b int) time.Duration { 327 + if b == 0 { 328 + return 0 329 + } 330 + if b < 10 { 331 + return (time.Duration(b) * 2) + (time.Millisecond * time.Duration(rand.Intn(1000))) 332 + } 333 + return time.Second * 30 334 + }
+22
knotmirror/knotstream/subscription.go
··· 1 + package knotstream 2 + 3 + import "tangled.org/core/knotmirror/models" 4 + 5 + // subscription represents websocket connection with that host 6 + type subscription struct { 7 + hostname string 8 + 9 + // embedded parallel job scheduler 10 + scheduler *ParallelScheduler 11 + } 12 + 13 + func (s *subscription) LastSeq() int64 { 14 + return s.scheduler.LastSeq() 15 + } 16 + 17 + func (s *subscription) HostCursor() models.HostCursor { 18 + return models.HostCursor{ 19 + Hostname: s.hostname, 20 + LastSeq: s.LastSeq(), 21 + } 22 + }
+29
knotmirror/metrics.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "github.com/prometheus/client_golang/prometheus" 5 + "github.com/prometheus/client_golang/prometheus/promauto" 6 + ) 7 + 8 + // Resync metrics 9 + var ( 10 + // TODO: 11 + // - working / waiting resycner counts 12 + resyncsStarted = promauto.NewCounter(prometheus.CounterOpts{ 13 + Name: "knotmirror_resyncs_started_total", 14 + Help: "Total number of repo resyncs started", 15 + }) 16 + resyncsCompleted = promauto.NewCounter(prometheus.CounterOpts{ 17 + Name: "knotmirror_resyncs_completed_total", 18 + Help: "Total number of repo resyncs completed", 19 + }) 20 + resyncsFailed = promauto.NewCounter(prometheus.CounterOpts{ 21 + Name: "knotmirror_resyncs_failed_total", 22 + Help: "Total number of repo resyncs failed", 23 + }) 24 + resyncDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 25 + Name: "knotmirror_resync_duration_seconds", 26 + Help: "Duration of repo resync operations", 27 + Buckets: prometheus.ExponentialBuckets(0.1, 2, 12), 28 + }) 29 + )
+110
knotmirror/models/models.go
··· 1 + package models 2 + 3 + import ( 4 + "fmt" 5 + 6 + "github.com/bluesky-social/indigo/atproto/syntax" 7 + "tangled.org/core/api/tangled" 8 + ) 9 + 10 + type Repo struct { 11 + Did syntax.DID 12 + Rkey syntax.RecordKey 13 + Cid *syntax.CID 14 + // content of tangled.Repo 15 + Name string 16 + KnotDomain string 17 + 18 + GitRev syntax.TID // last processed git.refUpdate revision 19 + RepoSha string // sha256 sum of git refs (to avoid no-op git fetch) 20 + State RepoState 21 + ErrorMsg string 22 + RetryCount int 23 + RetryAfter int64 // Unix timestamp (seconds) 24 + } 25 + 26 + func (r *Repo) AtUri() syntax.ATURI { 27 + return syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", r.Did, tangled.RepoNSID, r.Rkey)) 28 + } 29 + 30 + func (r *Repo) DidSlashRepo() string { 31 + return fmt.Sprintf("%s/%s", r.Did, r.Name) 32 + } 33 + 34 + type RepoState string 35 + 36 + const ( 37 + RepoStatePending RepoState = "pending" 38 + RepoStateDesynchronized RepoState = "desynchronized" 39 + RepoStateResyncing RepoState = "resyncing" 40 + RepoStateActive RepoState = "active" 41 + RepoStateSuspended RepoState = "suspended" 42 + RepoStateError RepoState = "error" 43 + ) 44 + 45 + func (s RepoState) AllStates() []RepoState { 46 + return []RepoState{ 47 + RepoStatePending, 48 + RepoStateDesynchronized, 49 + RepoStateResyncing, 50 + RepoStateActive, 51 + RepoStateSuspended, 52 + RepoStateError, 53 + } 54 + } 55 + 56 + type HostCursor struct { 57 + Hostname string 58 + LastSeq int64 59 + } 60 + 61 + type Host struct { 62 + Hostname string 63 + NoSSL bool 64 + Status HostStatus 65 + LastSeq int64 66 + } 67 + 68 + type HostStatus string 69 + 70 + const ( 71 + HostStatusActive HostStatus = "active" 72 + HostStatusIdle HostStatus = "idle" 73 + HostStatusOffline HostStatus = "offline" 74 + HostStatusThrottled HostStatus = "throttled" 75 + HostStatusBanned HostStatus = "banned" 76 + ) 77 + 78 + func (s HostStatus) AllStatuses() []HostStatus { 79 + return []HostStatus{ 80 + HostStatusActive, 81 + HostStatusIdle, 82 + HostStatusOffline, 83 + HostStatusThrottled, 84 + HostStatusBanned, 85 + } 86 + } 87 + 88 + // func (h *Host) SubscribeGitRefsURL(cursor int64) string { 89 + // scheme := "wss" 90 + // if h.NoSSL { 91 + // scheme = "ws" 92 + // } 93 + // u := fmt.Sprintf("%s://%s/xrpc/%s", scheme, h.Hostname, tangled.SubscribeGitRefsNSID) 94 + // if cursor > 0 { 95 + // u = fmt.Sprintf("%s?cursor=%d", u, h.LastSeq) 96 + // } 97 + // return u 98 + // } 99 + 100 + func (h *Host) LegacyEventsURL(cursor int64) string { 101 + scheme := "wss" 102 + if h.NoSSL { 103 + scheme = "ws" 104 + } 105 + u := fmt.Sprintf("%s://%s/events", scheme, h.Hostname) 106 + if cursor > 0 { 107 + u = fmt.Sprintf("%s?cursor=%d", u, cursor) 108 + } 109 + return u 110 + }
+16
knotmirror/readme.md
··· 1 + # KnotMirror 2 + 3 + Mirror of all known repos. Heavily inspired by [indigo/relay] and [indigo/tap]. 4 + 5 + Knot Mirror syncs repo list using tap and subscribe to all known knots as KnotStream. 6 + 7 + # TODO 8 + 9 + - [ ] cleanup 'resyncing' state on shutdown (or on startup too) 10 + - [ ] better tap reconnecting logic 11 + - [ ] handle really large repos (maybe shallow-clone first?) 12 + 13 + idea: run multiple different resync workers. 4 for long running tasks, 10 for short tasks. on timeout, schedule it for long running task 14 + 15 + [indigo/relay]: https://github.com/bluesky-social/indigo/tree/main/cmd/relay 16 + [indigo/tap]: https://github.com/bluesky-social/indigo/tree/main/cmd/tap
+260
knotmirror/resyncer.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "errors" 7 + "fmt" 8 + "log/slog" 9 + "math/rand" 10 + "net/url" 11 + "path" 12 + "sync" 13 + "time" 14 + 15 + "github.com/bluesky-social/indigo/atproto/syntax" 16 + "github.com/go-git/go-git/v5" 17 + gitconfig "github.com/go-git/go-git/v5/config" 18 + "github.com/go-git/go-git/v5/plumbing/transport" 19 + "tangled.org/core/knotmirror/config" 20 + "tangled.org/core/knotmirror/db" 21 + "tangled.org/core/knotmirror/models" 22 + "tangled.org/core/log" 23 + ) 24 + 25 + type Resyncer struct { 26 + logger *slog.Logger 27 + db *sql.DB 28 + 29 + claimJobMu sync.Mutex 30 + 31 + repoBasePath string 32 + repoFetchTimeout time.Duration 33 + knotUseSSL bool 34 + 35 + parallelism int 36 + } 37 + 38 + func NewResyncer(l *slog.Logger, db *sql.DB, cfg *config.Config) *Resyncer { 39 + return &Resyncer{ 40 + logger: log.SubLogger(l, "resyncer"), 41 + db: db, 42 + repoBasePath: cfg.GitRepoBasePath, 43 + repoFetchTimeout: cfg.GitRepoFetchTimeout, 44 + knotUseSSL: cfg.KnotUseSSL, 45 + parallelism: cfg.ResyncParallelism, 46 + } 47 + } 48 + 49 + func (r *Resyncer) Start(ctx context.Context) { 50 + for i := 0; i < r.parallelism; i++ { 51 + go r.runResyncWorker(ctx, i) 52 + } 53 + } 54 + 55 + func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) { 56 + l := r.logger.With("worker", workerID) 57 + for { 58 + select { 59 + case <-ctx.Done(): 60 + l.Info("resync worker shutting down", "error", ctx.Err()) 61 + return 62 + default: 63 + } 64 + repoAt, found, err := r.claimResyncJob(ctx) 65 + if err != nil { 66 + l.Error("failed to claim resync job", "error", err) 67 + time.Sleep(time.Second) 68 + continue 69 + } 70 + if !found { 71 + time.Sleep(time.Second) 72 + continue 73 + } 74 + l.Info("processing resync", "aturi", repoAt) 75 + if err := r.resyncRepo(ctx, repoAt); err != nil { 76 + l.Error("resync failed", "aturi", repoAt, "error", err) 77 + } 78 + } 79 + } 80 + 81 + func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) { 82 + // use mutex to prevent duplicated jobs 83 + r.claimJobMu.Lock() 84 + defer r.claimJobMu.Unlock() 85 + 86 + var repoAt syntax.ATURI 87 + now := time.Now().Unix() 88 + if err := r.db.QueryRowContext(ctx, 89 + `update repos 90 + set state = ? 91 + where at_uri = ( 92 + select at_uri from repos 93 + where state in (?, ?, ?) 94 + and (retry_after = 0 or retry_after < ?) 95 + limit 1 96 + ) 97 + returning at_uri 98 + `, 99 + models.RepoStateResyncing, 100 + models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError, 101 + now, 102 + ).Scan(&repoAt); err != nil { 103 + if errors.Is(err, sql.ErrNoRows) { 104 + return "", false, nil 105 + } 106 + return "", false, err 107 + } 108 + 109 + return repoAt, true, nil 110 + } 111 + 112 + func (r *Resyncer) resyncRepo(ctx context.Context, repoAt syntax.ATURI) error { 113 + // ctx, span := tracer.Start(ctx, "resyncRepo") 114 + // span.SetAttributes(attribute.String("aturi", repoAt)) 115 + // defer span.End() 116 + 117 + resyncsStarted.Inc() 118 + startTime := time.Now() 119 + 120 + success, err := r.doResync(ctx, repoAt) 121 + if !success { 122 + resyncsFailed.Inc() 123 + resyncDuration.Observe(time.Since(startTime).Seconds()) 124 + return r.handleResyncError(ctx, repoAt, err) 125 + } 126 + 127 + resyncsCompleted.Inc() 128 + resyncDuration.Observe(time.Since(startTime).Seconds()) 129 + return nil 130 + } 131 + 132 + func (r *Resyncer) doResync(ctx context.Context, repoAt syntax.ATURI) (bool, error) { 133 + // ctx, span := tracer.Start(ctx, "doResync") 134 + // span.SetAttributes(attribute.String("aturi", repoAt)) 135 + // defer span.End() 136 + 137 + repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 138 + if err != nil { 139 + return false, fmt.Errorf("failed to get repo: %w", err) 140 + } 141 + if repo == nil { // untracked repo, skip 142 + return false, nil 143 + } 144 + 145 + repoPath := r.repoPath(repo) 146 + remoteUrl := r.repoRemoteURL(repo) 147 + l := r.logger.With("repo", repo.DidSlashRepo(), "path", repoPath, "url", remoteUrl) 148 + 149 + ctx, cancel := context.WithTimeout(ctx, r.repoFetchTimeout) 150 + defer cancel() 151 + 152 + // TODO: check if Knot is on backoff list. If so, return (false, nil) 153 + // TODO: use r.repoFetchTimeout on fetch 154 + // TODO: detect rate limit error (http.StatusTooManyRequests) to put Knot in backoff list 155 + gr, err := git.PlainOpen(repoPath) 156 + if errors.Is(err, git.ErrRepositoryNotExists) { 157 + l.Debug("cloning repo") 158 + // if err := exec.Command("git", "clone", "--mirror", remoteUrl, repoPath).Run(); err != nil { 159 + // return false, fmt.Errorf("cloning repo: %w", err) 160 + // } 161 + _, err := git.PlainCloneContext(ctx, repoPath, true, &git.CloneOptions{ 162 + URL: remoteUrl, 163 + Mirror: true, 164 + }) 165 + if err != nil && !errors.Is(err, transport.ErrEmptyRemoteRepository) { 166 + return false, fmt.Errorf("cloning repo: %w", err) 167 + } 168 + } else { 169 + if err != nil { 170 + return false, fmt.Errorf("laoding repo: %w", err) 171 + } 172 + l.Debug("fetching repo") 173 + // if err := exec.Command("git", "-C", repoPath, "fetch", "--mirror", remoteUrl).Run(); err != nil { 174 + // return false, fmt.Errorf("fetching repo: %w", err) 175 + // } 176 + if err := gr.FetchContext(ctx, &git.FetchOptions{ 177 + RemoteURL: remoteUrl, 178 + RefSpecs: []gitconfig.RefSpec{gitconfig.RefSpec("+refs/*:refs/*")}, 179 + Force: true, 180 + Prune: true, 181 + }); err != nil { 182 + return false, fmt.Errorf("fetching reppo: %w", err) 183 + } 184 + } 185 + 186 + // repo.GitRev = <processed git.refUpdate revision> 187 + // repo.RepoSha = <sha256 sum of git refs> 188 + repo.State = models.RepoStateActive 189 + repo.ErrorMsg = "" 190 + repo.RetryCount = 0 191 + repo.RetryAfter = 0 192 + if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 193 + return false, fmt.Errorf("updating repo state to active %w", err) 194 + } 195 + return true, nil 196 + } 197 + 198 + func (r *Resyncer) handleResyncError(ctx context.Context, repoAt syntax.ATURI, err error) error { 199 + r.logger.Debug("handleResyncError", "at_uri", repoAt, "err", err) 200 + var state models.RepoState 201 + var errMsg string 202 + if err == nil { 203 + state = models.RepoStateDesynchronized 204 + errMsg = "" 205 + } else { 206 + state = models.RepoStateError 207 + errMsg = err.Error() 208 + } 209 + 210 + repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 211 + if err != nil { 212 + return err 213 + } 214 + if repo == nil { 215 + return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt) 216 + } 217 + 218 + var retryCount = repo.RetryCount + 1 219 + var retryAfter int64 220 + if retryCount >= 5 { 221 + state = models.RepoStateSuspended 222 + errMsg = "too many resync fails" 223 + retryAfter = 0 224 + } else { 225 + // start a 1 min & go up to 1 hr between retries 226 + retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix() 227 + } 228 + 229 + repo.State = state 230 + repo.ErrorMsg = errMsg 231 + repo.RetryCount = retryCount 232 + repo.RetryAfter = retryAfter 233 + if dbErr := db.UpsertRepo(ctx, r.db, repo); dbErr != nil { 234 + return dbErr 235 + } 236 + return err 237 + } 238 + 239 + func (r *Resyncer) repoPath(repo *models.Repo) string { 240 + return path.Join(r.repoBasePath, repo.Did.String(), repo.Rkey.String()) 241 + } 242 + 243 + func (r *Resyncer) repoRemoteURL(repo *models.Repo) string { 244 + u, _ := url.Parse(repo.KnotDomain) 245 + if u.Scheme == "" { 246 + if r.knotUseSSL { 247 + u.Scheme = "https" 248 + } else { 249 + u.Scheme = "http" 250 + } 251 + } 252 + u = u.JoinPath(repo.DidSlashRepo()) 253 + return u.String() 254 + } 255 + 256 + func backoff(retries int, max int) time.Duration { 257 + dur := min(1<<retries, max) 258 + jitter := time.Millisecond * time.Duration(rand.Intn(1000)) 259 + return time.Second*time.Duration(dur) + jitter 260 + }
+114
knotmirror/tapclient.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "encoding/json" 7 + "fmt" 8 + "log/slog" 9 + "net/url" 10 + "time" 11 + 12 + "tangled.org/core/api/tangled" 13 + "tangled.org/core/knotmirror/config" 14 + "tangled.org/core/knotmirror/db" 15 + "tangled.org/core/knotmirror/knotstream" 16 + "tangled.org/core/knotmirror/models" 17 + "tangled.org/core/log" 18 + "tangled.org/core/tap" 19 + ) 20 + 21 + type Tap struct { 22 + logger *slog.Logger 23 + cfg *config.Config 24 + tap tap.Client 25 + db *sql.DB 26 + ks *knotstream.KnotStream 27 + } 28 + 29 + func NewTapClient(l *slog.Logger, cfg *config.Config, db *sql.DB, ks *knotstream.KnotStream) *Tap { 30 + return &Tap{ 31 + logger: log.SubLogger(l, "tapclient"), 32 + cfg: cfg, 33 + tap: tap.NewClient(cfg.TapUrl, ""), 34 + db: db, 35 + ks: ks, 36 + } 37 + } 38 + 39 + func (t *Tap) Start(ctx context.Context) { 40 + // TODO: better reconnect logic 41 + go func() { 42 + for { 43 + t.tap.Connect(ctx, &tap.SimpleIndexer{ 44 + EventHandler: t.processEvent, 45 + }) 46 + time.Sleep(time.Second) 47 + } 48 + }() 49 + } 50 + 51 + func (t *Tap) processEvent(ctx context.Context, evt tap.Event) error { 52 + l := t.logger.With("component", "tapIndexer") 53 + 54 + var err error 55 + switch evt.Type { 56 + case tap.EvtRecord: 57 + switch evt.Record.Collection.String() { 58 + case tangled.RepoNSID: 59 + err = t.processRepo(ctx, evt.Record) 60 + } 61 + } 62 + 63 + if err != nil { 64 + l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err) 65 + return err 66 + } 67 + return nil 68 + } 69 + 70 + func (t *Tap) processRepo(ctx context.Context, evt *tap.RecordEventData) error { 71 + switch evt.Action { 72 + case tap.RecordCreateAction, tap.RecordUpdateAction: 73 + record := tangled.Repo{} 74 + if err := json.Unmarshal(evt.Record, &record); err != nil { 75 + return fmt.Errorf("parsing record: %w", err) 76 + } 77 + 78 + status := models.RepoStatePending 79 + errMsg := "" 80 + u, err := url.Parse("http://"+record.Knot) // parsing with fake scheme 81 + if err != nil { 82 + status = models.RepoStateSuspended 83 + errMsg = "failed to parse knot url" 84 + } 85 + if u.Hostname() == "localhost" { 86 + status = models.RepoStateSuspended 87 + errMsg = "suspending localhost knot" 88 + } 89 + 90 + if err := db.UpsertRepo(ctx, t.db, &models.Repo{ 91 + Did: evt.Did, 92 + Rkey: evt.Rkey, 93 + Cid: evt.CID, 94 + Name: record.Name, 95 + KnotDomain: record.Knot, 96 + State: status, 97 + ErrorMsg: errMsg, 98 + }); err != nil { 99 + return fmt.Errorf("upserting repo to db: %w", err) 100 + } 101 + 102 + if !t.ks.CheckIfSubscribed(record.Knot) { 103 + if err := t.ks.SubscribeHost(ctx, record.Knot, !t.cfg.KnotUseSSL); err != nil { 104 + return fmt.Errorf("subscribing to knot: %w", err) 105 + } 106 + } 107 + 108 + case tap.RecordDeleteAction: 109 + if err := db.DeleteRepo(ctx, t.db, evt.Did, evt.Rkey); err != nil { 110 + return fmt.Errorf("deleting repo from db: %w", err) 111 + } 112 + } 113 + return nil 114 + }
+26
knotmirror/templates/base.html
··· 1 + {{define "base"}} 2 + <!DOCTYPE html> 3 + <html> 4 + <head> 5 + <title>KnotMirror Admin</title> 6 + <script src="https://cdn.jsdelivr.net/npm/htmx.org@2.0.8/dist/htmx.min.js" integrity="sha384-/TgkGk7p307TH7EXJDuUlgG3Ce1UVolAOFopFekQkkXihi5u/6OCvVKyz1W+idaz" crossorigin="anonymous"></script> 7 + <style> 8 + nav { margin-bottom: 20px; border-bottom: 1px solid #ccc; padding: 10px 0; } 9 + nav a { margin-right: 15px; } 10 + table { width: 100%; border-collapse: collapse; } 11 + th, td { text-align: left; padding: 8px; border: 1px solid #ddd; } 12 + .pagination { margin-top: 20px; } 13 + .filters { background: #f4f4f4; padding: 15px; margin-bottom: 20px; } 14 + </style> 15 + </head> 16 + <body> 17 + <nav> 18 + <a href="/repos">Repositories</a> 19 + <a href="/hosts">Knot Hosts</a> 20 + </nav> 21 + <main id="main"> 22 + {{template "content" .}} 23 + </main> 24 + </body> 25 + </html> 26 + {{end}}
+45
knotmirror/templates/hosts.html
··· 1 + {{template "base" .}} 2 + {{define "content"}} 3 + <h2>Knot Hosts</h2> 4 + 5 + <div class="filters"> 6 + <form 7 + hx-get="" 8 + hx-target="#table" 9 + hx-select="#table" 10 + hx-swap="outerHTML" 11 + hx-trigger="every 10s" 12 + > 13 + <select name="status"> 14 + <option value="">-- Statuse --</option> 15 + {{ range .FilterByStatus.AllStatuses }} 16 + <option value="{{.}}" {{ if eq $.FilterByStatus . }}selected{{end}}>{{.}}</option> 17 + {{ end }} 18 + </select> 19 + <button type="submit">Filter</button> 20 + </form> 21 + </div> 22 + 23 + <table id="table"> 24 + <thead> 25 + <tr> 26 + <th>Hostname</th> 27 + <th>SSL</th> 28 + <th>Status</th> 29 + <th>Last Seq</th> 30 + </tr> 31 + </thead> 32 + <tbody> 33 + {{range .Hosts}} 34 + <tr> 35 + <td>{{.Hostname}}</td> 36 + <td>{{if .NoSSL}}False{{else}}True{{end}}</td> 37 + <td>{{.Status}}</td> 38 + <td>{{.LastSeq}}</td> 39 + </tr> 40 + {{else}} 41 + <tr><td colspan="4">No hosts registered.</td></tr> 42 + {{end}} 43 + </tbody> 44 + </table> 45 + {{end}}
+71
knotmirror/templates/repos.html
··· 1 + {{template "base" .}} 2 + {{define "content"}} 3 + <h2>Repositories</h2> 4 + 5 + <div class="filters"> 6 + <form 7 + hx-get="" 8 + hx-target="#table" 9 + hx-select="#table" 10 + hx-swap="outerHTML" 11 + hx-trigger="every 10s" 12 + > 13 + <input type="text" name="did" placeholder="DID" value="{{.FilterByDid}}"> 14 + <input type="text" name="knot" placeholder="Knot Domain" value="{{.FilterByKnot}}"> 15 + <select name="state"> 16 + <option value="">-- State --</option> 17 + {{ range .FilterByState.AllStates }} 18 + <option value="{{.}}" {{ if eq $.FilterByState . }}selected{{end}}>{{.}}</option> 19 + {{ end }} 20 + </select> 21 + <button type="submit">Filter</button> 22 + <a href="/repos">Clear</a> 23 + </form> 24 + </div> 25 + 26 + <div id="table"> 27 + <div class="repo-state-indicators"> 28 + {{range .FilterByState.AllStates}} 29 + <span class="state-pill state-{{.}}"> 30 + {{.}}: {{index $.RepoCounts .}} 31 + </span> 32 + {{end}} 33 + </div> 34 + <table> 35 + <thead> 36 + <tr> 37 + <th>DID</th> 38 + <th>Name</th> 39 + <th>Knot</th> 40 + <th>State</th> 41 + <th>Retry</th> 42 + <th>Retry After</th> 43 + <th>Error Message</th> 44 + </tr> 45 + </thead> 46 + <tbody> 47 + {{range .Repos}} 48 + <tr> 49 + <td><code>{{.Did}}</code></td> 50 + <td>{{.Name}}</td> 51 + <td>{{.KnotDomain}}</td> 52 + <td><strong>{{.State}}</strong></td> 53 + <td>{{.RetryCount}}</td> 54 + <td>{{readt .RetryAfter}}</td> 55 + <td>{{.ErrorMsg}}</td> 56 + </tr> 57 + {{else}} 58 + <tr><td colspan="99">No repositories found.</td></tr> 59 + {{end}} 60 + </tbody> 61 + </table> 62 + </div> 63 + 64 + <div class="pagination"> 65 + {{if gt .Page 1}} 66 + <a href="?page={{sub .Page 1}}&did={{.FilterByDid}}&knot={{.FilterByKnot}}&state={{.FilterByState}}">« Previous</a> 67 + {{end}} 68 + <span>Page {{.Page}}</span> 69 + <a href="?page={{add .Page 1}}&did={{.FilterByDid}}&knot={{.FilterByKnot}}&state={{.FilterByState}}">Next »</a> 70 + </div> 71 + {{end}}
+17 -14
nix/gomod2nix.toml
··· 470 470 version = "v1.5.5" 471 471 hash = "sha256-ouhfDUCXsfpcgaCLfJE9oYprAQHuV61OJzb/aEhT0j8=" 472 472 [mod."github.com/prometheus/client_golang"] 473 - version = "v1.22.0" 474 - hash = "sha256-OJ/9rlWG1DIPQJAZUTzjykkX0o+f+4IKLvW8YityaMQ=" 473 + version = "v1.23.2" 474 + hash = "sha256-3GD4fBFa1tJu8MS4TNP6r2re2eViUE+kWUaieIOQXCg=" 475 475 [mod."github.com/prometheus/client_model"] 476 476 version = "v0.6.2" 477 477 hash = "sha256-q6Fh6v8iNJN9ypD47LjWmx66YITa3FyRjZMRsuRTFeQ=" 478 478 [mod."github.com/prometheus/common"] 479 - version = "v0.64.0" 480 - hash = "sha256-uy3KO60F2Cvhamz3fWQALGSsy13JiTk3NfpXgRuwqtI=" 479 + version = "v0.66.1" 480 + hash = "sha256-bqHPaV9IV70itx63wqwgy2PtxMN0sn5ThVxDmiD7+Tk=" 481 481 [mod."github.com/prometheus/procfs"] 482 482 version = "v0.16.1" 483 483 hash = "sha256-OBCvKlLW2obct35p0L9Q+1ZrxZjpTmbgHMP2rng9hpo=" ··· 510 510 version = "v0.0.0-20220730225603-2ab79fcdd4ef" 511 511 hash = "sha256-/XmSE/J+f6FLWXGvljh6uBK71uoCAK3h82XQEQ1Ki68=" 512 512 [mod."github.com/stretchr/testify"] 513 - version = "v1.10.0" 514 - hash = "sha256-fJ4gnPr0vnrOhjQYQwJ3ARDKPsOtA7d4olQmQWR+wpI=" 513 + version = "v1.11.1" 514 + hash = "sha256-sWfjkuKJyDllDEtnM8sb/pdLzPQmUYWYtmeWz/5suUc=" 515 515 [mod."github.com/urfave/cli/v3"] 516 516 version = "v3.3.3" 517 517 hash = "sha256-FdPiu7koY1qBinkfca4A05zCrX+Vu4eRz8wlRDZJyGg=" ··· 581 581 [mod."go.uber.org/zap"] 582 582 version = "v1.27.0" 583 583 hash = "sha256-8655KDrulc4Das3VRduO9MjCn8ZYD5WkULjCvruaYsU=" 584 + [mod."go.yaml.in/yaml/v2"] 585 + version = "v2.4.2" 586 + hash = "sha256-oC8RWdf1zbMYCtmR0ATy/kCkhIwPR9UqFZSMOKLVF/A=" 584 587 [mod."golang.org/x/crypto"] 585 - version = "v0.40.0" 586 - hash = "sha256-I6p2fqvz63P9MwAuoQrljI7IUbfZQvCem0ii4Q2zZng=" 588 + version = "v0.41.0" 589 + hash = "sha256-o5Di0lsFmYnXl7a5MBTqmN9vXMCRpE9ay71C1Ar8jEY=" 587 590 [mod."golang.org/x/exp"] 588 591 version = "v0.0.0-20250620022241-b7579e27df2b" 589 592 hash = "sha256-IsDTeuWLj4UkPO4NhWTvFeZ22WNtlxjoWiyAJh6zdig=" ··· 591 594 version = "v0.31.0" 592 595 hash = "sha256-ZFTlu9+4QToPPLA8C5UcG2eq/lQylq81RoG/WtYo9rg=" 593 596 [mod."golang.org/x/net"] 594 - version = "v0.42.0" 595 - hash = "sha256-YxileisIIez+kcAI+21kY5yk0iRuEqti2YdmS8jvP2s=" 597 + version = "v0.43.0" 598 + hash = "sha256-bf3iQFrsC8BoarVaS0uSspEFAcr1zHp1uziTtBpwV34=" 596 599 [mod."golang.org/x/sync"] 597 600 version = "v0.17.0" 598 601 hash = "sha256-M85lz4hK3/fzmcUViAp/CowHSxnr3BHSO7pjHp1O6i0=" 599 602 [mod."golang.org/x/sys"] 600 - version = "v0.34.0" 601 - hash = "sha256-5rZ7p8IaGli5X1sJbfIKOcOEwY4c0yQhinJPh2EtK50=" 603 + version = "v0.35.0" 604 + hash = "sha256-ZKM8pesQE6NAFZeKQ84oPn5JMhGr8g4TSwLYAsHMGSI=" 602 605 [mod."golang.org/x/text"] 603 606 version = "v0.29.0" 604 607 hash = "sha256-2cWBtJje+Yc+AnSgCANqBlIwnOMZEGkpQ2cFI45VfLI=" ··· 618 621 version = "v1.73.0" 619 622 hash = "sha256-LfVlwip++q2DX70RU6CxoXglx1+r5l48DwlFD05G11c=" 620 623 [mod."google.golang.org/protobuf"] 621 - version = "v1.36.6" 622 - hash = "sha256-lT5qnefI5FDJnowz9PEkAGylH3+fE+A3DJDkAyy9RMc=" 624 + version = "v1.36.8" 625 + hash = "sha256-yZN8ZON0b5HjUNUSubHst7zbvnMsOzd81tDPYQRtPgM=" 623 626 [mod."gopkg.in/fsnotify.v1"] 624 627 version = "v1.4.7" 625 628 hash = "sha256-j/Ts92oXa3k1MFU7Yd8/AqafRTsFn7V2pDKCyDJLah8="
+20
nix/pkgs/knot-mirror.nix
··· 1 + { 2 + buildGoApplication, 3 + modules, 4 + sqlite-lib, 5 + src, 6 + }: 7 + buildGoApplication { 8 + pname = "knotmirror"; 9 + version = "0.1.0"; 10 + inherit src modules; 11 + 12 + doCheck = false; 13 + 14 + subPackages = ["cmd/knotmirror"]; 15 + tags = ["libsqlite3"]; 16 + 17 + env.CGO_CFLAGS = "-I ${sqlite-lib}/include "; 18 + env.CGO_LDFLAGS = "-L ${sqlite-lib}/lib"; 19 + CGO_ENABLED = 1; 20 + }
+5
tap/tap.go
··· 132 132 // TODO: keep websocket connection alive 133 133 conn, _, err := websocket.DefaultDialer.DialContext(ctx, url, nil) 134 134 if err != nil { 135 + l.Error("failed to connect to tap", "err", err) 135 136 return err 136 137 } 137 138 defer conn.Close() 139 + defer func() { 140 + l.Warn("closed tap conection") 141 + }() 142 + l.Info("established tap conection") 138 143 139 144 for { 140 145 select {