Monorepo for Tangled

wip: knotmirror: introduce knotmirror

Signed-off-by: Seongmin Lee <git@boltless.me>

boltless.me 63c82b42 a9a1566c

verified
+1677 -39
+1
.gitignore
··· 19 19 # Created if following hacking.md 20 20 genjwks.out 21 21 /nix/vm-data 22 + /mirror
+51
cmd/knotmirror/main.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "log/slog" 6 + "os" 7 + "os/signal" 8 + "syscall" 9 + 10 + "github.com/carlmjohnson/versioninfo" 11 + "github.com/urfave/cli/v3" 12 + "tangled.org/core/knotmirror" 13 + "tangled.org/core/log" 14 + ) 15 + 16 + func main() { 17 + if err := run(os.Args); err != nil { 18 + slog.Error("error running knotmirror", "err", err) 19 + os.Exit(-1) 20 + } 21 + } 22 + 23 + func run(args []string) error { 24 + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) 25 + defer cancel() 26 + 27 + logger := log.New("knotmirror") 28 + slog.SetDefault(logger) 29 + ctx = log.IntoContext(ctx, logger) 30 + 31 + app := cli.Command{ 32 + Name: "knotmirror", 33 + Usage: "knot mirroring service", 34 + Version: versioninfo.Short(), 35 + } 36 + app.Flags = []cli.Flag{} 37 + app.Commands = []*cli.Command{ 38 + { 39 + Name: "serve", 40 + Usage: "run the knotmirror daemon", 41 + Action: runKnotMirror, 42 + Flags: []cli.Flag{}, 43 + }, 44 + } 45 + return app.Run(ctx, args) 46 + } 47 + 48 + func runKnotMirror(ctx context.Context, cmd *cli.Command) error { 49 + // TODO: generate Config from arguments & pass down to Run() 50 + return knotmirror.Run(ctx) 51 + }
+3 -1
flake.nix
··· 99 99 bluesky-jetstream = self.callPackage ./nix/pkgs/bluesky-jetstream.nix {}; 100 100 bluesky-relay = self.callPackage ./nix/pkgs/bluesky-relay.nix {}; 101 101 tap = self.callPackage ./nix/pkgs/tap.nix {}; 102 + knotmirror = self.callPackage ./nix/pkgs/knot-mirror.nix {}; 102 103 }); 103 104 in { 104 105 overlays.default = final: prev: { 105 - inherit (mkPackageSet final) lexgen goat sqlite-lib spindle knot-unwrapped knot appview docs dolly did-method-plc bluesky-jetstream bluesky-relay tap; 106 + inherit (mkPackageSet final) lexgen goat sqlite-lib spindle knot-unwrapped knot appview docs dolly did-method-plc bluesky-jetstream bluesky-relay tap knotmirror; 106 107 }; 107 108 108 109 packages = forAllSystems (system: let ··· 191 192 pkgs.coreutils # for those of us who are on systems that use busybox (alpine) 192 193 packages'.lexgen 193 194 packages'.treefmt-wrapper 195 + packages'.tap 194 196 ]; 195 197 shellHook = '' 196 198 mkdir -p appview/pages/static
+9 -8
go.mod
··· 37 37 github.com/microcosm-cc/bluemonday v1.0.27 38 38 github.com/openbao/openbao/api/v2 v2.3.0 39 39 github.com/posthog/posthog-go v1.5.5 40 + github.com/prometheus/client_golang v1.23.2 40 41 github.com/redis/go-redis/v9 v9.7.3 41 42 github.com/resend/resend-go/v2 v2.15.0 42 43 github.com/sethvargo/go-envconfig v1.1.0 43 44 github.com/srwiley/oksvg v0.0.0-20221011165216-be6e8873101c 44 45 github.com/srwiley/rasterx v0.0.0-20220730225603-2ab79fcdd4ef 45 - github.com/stretchr/testify v1.10.0 46 + github.com/stretchr/testify v1.11.1 46 47 github.com/urfave/cli/v3 v3.3.3 47 48 github.com/whyrusleeping/cbor-gen v0.3.1 48 49 github.com/yuin/goldmark v1.7.13 49 50 github.com/yuin/goldmark-emoji v1.0.6 50 51 github.com/yuin/goldmark-highlighting/v2 v2.0.0-20230729083705-37449abec8cc 51 52 gitlab.com/staticnoise/goldmark-callout v0.0.0-20240609120641-6366b799e4ab 52 - golang.org/x/crypto v0.40.0 53 - golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b 53 + golang.org/x/crypto v0.41.0 54 54 golang.org/x/image v0.31.0 55 - golang.org/x/net v0.42.0 55 + golang.org/x/net v0.43.0 56 56 golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da 57 57 gopkg.in/yaml.v3 v3.0.1 58 58 ) ··· 179 179 github.com/pkg/errors v0.9.1 // indirect 180 180 github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect 181 181 github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f // indirect 182 - github.com/prometheus/client_golang v1.22.0 // indirect 183 182 github.com/prometheus/client_model v0.6.2 // indirect 184 - github.com/prometheus/common v0.64.0 // indirect 183 + github.com/prometheus/common v0.66.1 // indirect 185 184 github.com/prometheus/procfs v0.16.1 // indirect 186 185 github.com/rivo/uniseg v0.4.7 // indirect 187 186 github.com/ryanuber/go-glob v1.0.0 // indirect ··· 204 203 go.uber.org/atomic v1.11.0 // indirect 205 204 go.uber.org/multierr v1.11.0 // indirect 206 205 go.uber.org/zap v1.27.0 // indirect 206 + go.yaml.in/yaml/v2 v2.4.2 // indirect 207 + golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect 207 208 golang.org/x/sync v0.17.0 // indirect 208 - golang.org/x/sys v0.34.0 // indirect 209 + golang.org/x/sys v0.35.0 // indirect 209 210 golang.org/x/text v0.29.0 // indirect 210 211 golang.org/x/time v0.12.0 // indirect 211 212 google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 // indirect 212 213 google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect 213 214 google.golang.org/grpc v1.73.0 // indirect 214 - google.golang.org/protobuf v1.36.6 // indirect 215 + google.golang.org/protobuf v1.36.8 // indirect 215 216 gopkg.in/fsnotify.v1 v1.4.7 // indirect 216 217 gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect 217 218 gopkg.in/warnings.v0 v0.1.2 // indirect
+20 -16
go.sum
··· 330 330 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= 331 331 github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= 332 332 github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= 333 + github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= 334 + github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= 333 335 github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= 334 336 github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= 335 337 github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= ··· 431 433 github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f/go.mod h1:/zvteZs/GwLtCgZ4BL6CBsk9IKIlexP43ObX9AxTqTw= 432 434 github.com/posthog/posthog-go v1.5.5 h1:2o3j7IrHbTIfxRtj4MPaXKeimuTYg49onNzNBZbwksM= 433 435 github.com/posthog/posthog-go v1.5.5/go.mod h1:3RqUmSnPuwmeVj/GYrS75wNGqcAKdpODiwc83xZWgdE= 434 - github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q= 435 - github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0= 436 + github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= 437 + github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= 436 438 github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= 437 439 github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= 438 - github.com/prometheus/common v0.64.0 h1:pdZeA+g617P7oGv1CzdTzyeShxAGrTBsolKNOLQPGO4= 439 - github.com/prometheus/common v0.64.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8= 440 + github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= 441 + github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= 440 442 github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= 441 443 github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= 442 444 github.com/redis/go-redis/v9 v9.0.0-rc.4/go.mod h1:Vo3EsyWnicKnSKCA7HhgnvnyA74wOA69Cd2Meli5mmA= ··· 482 484 github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= 483 485 github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= 484 486 github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= 485 - github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= 486 - github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= 487 + github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= 488 + github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= 487 489 github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= 488 490 github.com/urfave/cli/v3 v3.3.3 h1:byCBaVdIXuLPIDm5CYZRVG6NvT7tv1ECqdU4YzlEa3I= 489 491 github.com/urfave/cli/v3 v3.3.3/go.mod h1:FJSKtM/9AiiTOJL4fJ6TbMUkxBXn7GO9guZqoZtpYpo= ··· 554 556 go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ= 555 557 go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= 556 558 go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= 559 + go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= 560 + go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= 557 561 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= 558 562 golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= 559 563 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= ··· 561 565 golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= 562 566 golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= 563 567 golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= 564 - golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM= 565 - golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY= 568 + golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4= 569 + golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= 566 570 golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o= 567 571 golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8= 568 572 golang.org/x/image v0.31.0 h1:mLChjE2MV6g1S7oqbXC0/UcKijjm5fnJLUYKIYrLESA= ··· 597 601 golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= 598 602 golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= 599 603 golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= 600 - golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= 601 - golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= 604 + golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= 605 + golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= 602 606 golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= 603 607 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= 604 608 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= ··· 638 642 golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= 639 643 golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= 640 644 golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= 641 - golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= 642 - golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= 645 + golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= 646 + golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= 643 647 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= 644 648 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= 645 649 golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= ··· 649 653 golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= 650 654 golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= 651 655 golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= 652 - golang.org/x/term v0.33.0 h1:NuFncQrRcaRvVmgRkvM3j/F00gWIAlcmlB8ACEKmGIg= 653 - golang.org/x/term v0.33.0/go.mod h1:s18+ql9tYWp1IfpV9DmCtQDDSRBUjKaw9M1eAv5UeF0= 656 + golang.org/x/term v0.34.0 h1:O/2T7POpk0ZZ7MAzMeWFSg6S5IpWd/RXDlM9hgM3DR4= 657 + golang.org/x/term v0.34.0/go.mod h1:5jC53AEywhIVebHgPVeg0mj8OD3VO9OzclacVrqpaAw= 654 658 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= 655 659 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= 656 660 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= ··· 703 707 google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= 704 708 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= 705 709 google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= 706 - google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= 707 - google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= 710 + google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= 711 + google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= 708 712 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= 709 713 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= 710 714 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+45
knotmirror/adminpage.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "database/sql" 5 + "net/http" 6 + "strconv" 7 + 8 + "tangled.org/core/appview/pagination" 9 + "tangled.org/core/knotmirror/db" 10 + "tangled.org/core/orm" 11 + ) 12 + 13 + type AdminServer struct { 14 + db *sql.DB 15 + } 16 + 17 + const pageSize = 20 18 + 19 + func (s *AdminServer) handleRepos(w http.ResponseWriter, r *http.Request) { 20 + pageNum, _ := strconv.Atoi(r.URL.Query().Get("page")) 21 + if pageNum < 1 { 22 + pageNum = 1 23 + } 24 + var ( 25 + did = r.URL.Query().Get("did") 26 + knot = r.URL.Query().Get("knot") 27 + state = r.URL.Query().Get("state") 28 + ) 29 + 30 + var page pagination.Page 31 + var filters []orm.Filter 32 + 33 + if did != "" { 34 + filters = append(filters, orm.FilterEq("did", did)) 35 + } 36 + if knot != "" { 37 + filters = append(filters, orm.FilterEq("knot_domain", knot)) 38 + } 39 + if state != "" { 40 + filters = append(filters, orm.FilterEq("state", state)) 41 + } 42 + 43 + _, _ = db.ListRepos(s.db, page, filters...) 44 + panic("unimplemented") 45 + }
+32
knotmirror/config/config.go
··· 1 + package config 2 + 3 + import ( 4 + "context" 5 + "time" 6 + 7 + "github.com/sethvargo/go-envconfig" 8 + ) 9 + 10 + type Config struct { 11 + TapUrl string `env:"MIRROR_TAP_URL, default=http://localhost:2480"` 12 + DbPath string `env:"MIRROR_DB_PATH, default=mirror.db"` 13 + KnotUseSSL bool `env:"MIRROR_KNOT_USE_SSL, default=false"` // use SSL for Knot when not schema is not specified 14 + GitRepoBasePath string `env:"MIRROR_GIT_BASEPATH, default=repos"` 15 + GitRepoFetchTimeout time.Duration `env:"MIRROR_GIT_FETCH_TIMEOUT, default=300s"` 16 + ResyncParallelism int `env:"MIRROR_RESYNC_PARALLELISM, default=5"` 17 + Slurper SlurperConfig `env:",prefix=MIRROR_SLURPER_"` 18 + MetricsListen string `env:"MIRROR_METRICS_LISTEN, default=:7100"` 19 + } 20 + 21 + type SlurperConfig struct { 22 + PersistCursorPeriod time.Duration `env:"PERSIST_CURSOR_PERIOD, default=4s"` 23 + ConcurrencyPerHost int `env:"CONCURRENCY, default=40"` 24 + } 25 + 26 + func Load(ctx context.Context) (*Config, error) { 27 + var cfg Config 28 + if err := envconfig.Process(ctx, &cfg); err != nil { 29 + return nil, err 30 + } 31 + return &cfg, nil 32 + }
+25
knotmirror/crawler.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "log/slog" 7 + 8 + "tangled.org/core/log" 9 + ) 10 + 11 + type Crawler struct { 12 + logger *slog.Logger 13 + db *sql.DB 14 + } 15 + 16 + func NewCrawler(l *slog.Logger, db *sql.DB) *Crawler { 17 + return &Crawler{ 18 + logger: log.SubLogger(l, "crawler"), 19 + db: db, 20 + } 21 + } 22 + 23 + func (c *Crawler) Start(ctx context.Context) { 24 + // TODO: repository crawler 25 + }
+68
knotmirror/db/db.go
··· 1 + package db 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "fmt" 7 + "strings" 8 + _ "github.com/mattn/go-sqlite3" 9 + ) 10 + 11 + func Make(ctx context.Context, dbPath string) (*sql.DB, error) { 12 + // https://github.com/mattn/go-sqlite3#connection-string 13 + opts := []string{ 14 + "_foreign_keys=1", 15 + "_journal_mode=WAL", 16 + "_synchronous=NORMAL", 17 + "_auto_vacuum=incremental", 18 + } 19 + 20 + db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&")) 21 + if err != nil { 22 + return nil, err 23 + } 24 + 25 + conn, err := db.Conn(ctx) 26 + if err != nil { 27 + return nil, err 28 + } 29 + defer conn.Close() 30 + 31 + _, err = conn.ExecContext(ctx, ` 32 + create table if not exists repos ( 33 + did text not null, 34 + rkey text not null, 35 + at_uri text generated always as ('at://' || did || '/' || 'sh.tangled.repo' || '/' || rkey) stored, 36 + cid text not null, 37 + 38 + -- record content 39 + name text not null, 40 + knot_domain text not null, 41 + 42 + -- sync data 43 + git_rev text not null, 44 + repo_sha text not null, 45 + state text not null default 'pending', 46 + error_msg text, 47 + retry_count integer not null default 0, 48 + retry_after integer not null default 0, 49 + 50 + unique(did, rkey) 51 + ); 52 + 53 + -- knot hosts 54 + create table if not exists hosts ( 55 + hostname text not null, 56 + no_ssl integer not null default 0, 57 + status text not null default 'active', 58 + last_seq integer not null default -1, 59 + 60 + unique(hostname) 61 + ); 62 + `) 63 + if err != nil { 64 + return nil, fmt.Errorf("initializing db schema: %w", err) 65 + } 66 + 67 + return db, nil 68 + }
+100
knotmirror/db/hosts.go
··· 1 + package db 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "errors" 7 + "fmt" 8 + "log" 9 + 10 + "tangled.org/core/knotmirror/models" 11 + ) 12 + 13 + func UpsertHost(ctx context.Context, e *sql.DB, host *models.Host) error { 14 + if _, err := e.ExecContext(ctx, 15 + `insert into hosts (hostname, no_ssl, status, last_seq) 16 + values (?, ?, ?, ?) 17 + on conflict(hostname) do update set 18 + no_ssl = excluded.no_ssl, 19 + status = excluded.status, 20 + last_seq = excluded.last_seq 21 + `, 22 + host.Hostname, 23 + host.NoSSL, 24 + host.Status, 25 + host.LastSeq, 26 + ); err != nil { 27 + return fmt.Errorf("upserting host: %w", err) 28 + }; 29 + return nil 30 + } 31 + 32 + func GetHost(ctx context.Context, e *sql.DB, hostname string) (*models.Host, error) { 33 + var host models.Host 34 + if err := e.QueryRowContext(ctx, 35 + `select hostname, no_ssl, status, last_seq 36 + from hosts where hostname = ?`, 37 + hostname, 38 + ).Scan( 39 + &host.Hostname, 40 + &host.NoSSL, 41 + &host.Status, 42 + &host.LastSeq, 43 + ); err != nil { 44 + if errors.Is(err, sql.ErrNoRows) { 45 + return nil, nil 46 + } 47 + return nil, err 48 + }; 49 + return &host, nil 50 + } 51 + 52 + func StoreCursors(ctx context.Context, e *sql.DB, cursors []models.HostCursor) error { 53 + tx, err := e.BeginTx(ctx, nil) 54 + if err != nil { 55 + return fmt.Errorf("starting transaction: %w", err) 56 + } 57 + defer tx.Rollback() 58 + for _, cur := range cursors { 59 + if cur.LastSeq <= 0 { 60 + continue 61 + } 62 + if _, err := tx.ExecContext(ctx, 63 + `update hosts set last_seq = ? where hostname = ?`, 64 + cur.LastSeq, 65 + cur.Hostname, 66 + ); err != nil { 67 + log.Println("failed to persist host cursor", "host:", cur.Hostname, "lastSeq", cur.LastSeq) 68 + } 69 + } 70 + return tx.Commit() 71 + } 72 + 73 + func ListHosts(ctx context.Context, e *sql.DB) ([]models.Host, error) { 74 + rows, err := e.QueryContext(ctx, 75 + `select hostname, no_ssl, status, last_seq 76 + from hosts where status = 'active'`, 77 + ) 78 + if err != nil { 79 + return nil, fmt.Errorf("querying hosts: %w", err) 80 + } 81 + defer rows.Close() 82 + 83 + var hosts []models.Host 84 + for rows.Next() { 85 + var host models.Host 86 + if err := rows.Scan( 87 + &host.Hostname, 88 + &host.NoSSL, 89 + &host.Status, 90 + &host.LastSeq, 91 + ); err != nil { 92 + return nil, fmt.Errorf("scanning row: %w", err) 93 + } 94 + hosts = append(hosts, host) 95 + } 96 + if err := rows.Err(); err != nil { 97 + return nil, fmt.Errorf("scanning rows: %w ", err) 98 + } 99 + return hosts, nil 100 + }
+156
knotmirror/db/repos.go
··· 1 + package db 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "fmt" 7 + 8 + "github.com/bluesky-social/indigo/atproto/syntax" 9 + "tangled.org/core/appview/pagination" 10 + "tangled.org/core/knotmirror/models" 11 + "tangled.org/core/orm" 12 + ) 13 + 14 + func AddRepo(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, cid syntax.CID, name, knot string) error { 15 + if _, err := e.ExecContext(ctx, 16 + `insert into repos (did, rkey, cid, name, knot_domain) 17 + values (?, ?, ?, ?, ?)`, 18 + did, rkey, cid, name, knot, 19 + ); err != nil { 20 + return fmt.Errorf("inserting repo: %w", err) 21 + } 22 + return nil 23 + } 24 + 25 + func UpsertRepo(ctx context.Context, e *sql.DB, repo *models.Repo) error { 26 + if _, err := e.ExecContext(ctx, 27 + `insert into repos (did, rkey, cid, name, knot_domain, git_rev, repo_sha, state, error_msg, retry_count, retry_after) 28 + values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 29 + on conflict(did, rkey) do update set 30 + cid = excluded.cid, 31 + name = excluded.name, 32 + knot_domain = excluded.knot_domain, 33 + git_rev = excluded.git_rev, 34 + repo_sha = excluded.repo_sha, 35 + state = excluded.state, 36 + error_msg = excluded.error_msg, 37 + retry_count = excluded.retry_count, 38 + retry_after = excluded.retry_after`, 39 + // where repos.cid != excluded.cid`, 40 + repo.Did, 41 + repo.Rkey, 42 + repo.Cid, 43 + repo.Name, 44 + repo.KnotDomain, 45 + repo.GitRev, 46 + repo.RepoSha, 47 + repo.State, 48 + repo.ErrorMsg, 49 + repo.RetryCount, 50 + repo.RetryAfter, 51 + ); err != nil { 52 + return fmt.Errorf("upserting repo: %w", err) 53 + } 54 + return nil 55 + } 56 + 57 + func UpdateRepoState(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, state models.RepoState) error { 58 + if _, err := e.ExecContext(ctx, 59 + `update repos 60 + set state = ? 61 + where did = ? and rkey = ?`, 62 + state, 63 + did, rkey, 64 + ); err != nil { 65 + return fmt.Errorf("updating repo: %w", err) 66 + } 67 + return nil 68 + } 69 + 70 + func DeleteRepo(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey) error { 71 + if _, err := e.ExecContext(ctx, 72 + `delete from repos where did = ? and rkey = ?`, 73 + did, 74 + rkey, 75 + ); err != nil { 76 + return fmt.Errorf("deleting repo: %w", err) 77 + } 78 + return nil 79 + } 80 + 81 + func GetRepoByName(ctx context.Context, e *sql.DB, did syntax.DID, name string) (*models.Repo, error) { 82 + var repo models.Repo 83 + if err := e.QueryRowContext(ctx, 84 + `select 85 + did, 86 + rkey, 87 + cid, 88 + name, 89 + knot_domain, 90 + git_rev, 91 + repo_sha, 92 + state, 93 + error_msg, 94 + retry_count, 95 + retry_after 96 + from repos 97 + where did = ? and name = ?`, 98 + did, 99 + name, 100 + ).Scan( 101 + &repo.Did, 102 + &repo.Rkey, 103 + &repo.Cid, 104 + &repo.Name, 105 + &repo.KnotDomain, 106 + &repo.GitRev, 107 + &repo.RepoSha, 108 + &repo.State, 109 + &repo.ErrorMsg, 110 + &repo.RetryCount, 111 + &repo.RetryAfter, 112 + ); err != nil { 113 + return nil, fmt.Errorf("querying repo: %w", err) 114 + } 115 + return &repo, nil 116 + } 117 + 118 + func GetRepoByAtUri(ctx context.Context, e *sql.DB, aturi syntax.ATURI) (*models.Repo, error) { 119 + var repo models.Repo 120 + if err := e.QueryRowContext(ctx, 121 + `select 122 + did, 123 + rkey, 124 + cid, 125 + name, 126 + knot_domain, 127 + git_rev, 128 + repo_sha, 129 + state, 130 + error_msg, 131 + retry_count, 132 + retry_after 133 + from repos 134 + where at_uri = ?`, 135 + aturi, 136 + ).Scan( 137 + &repo.Did, 138 + &repo.Rkey, 139 + &repo.Cid, 140 + &repo.Name, 141 + &repo.KnotDomain, 142 + &repo.GitRev, 143 + &repo.RepoSha, 144 + &repo.State, 145 + &repo.ErrorMsg, 146 + &repo.RetryCount, 147 + &repo.RetryAfter, 148 + ); err != nil { 149 + return nil, fmt.Errorf("querying repo: %w", err) 150 + } 151 + return &repo, nil 152 + } 153 + 154 + func ListRepos(e *sql.DB, page pagination.Page, filters ...orm.Filter) ([]models.Repo, error) { 155 + panic("unimplemented") 156 + }
+95
knotmirror/knotmirror.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "net/http" 7 + "time" 8 + 9 + "github.com/prometheus/client_golang/prometheus/promhttp" 10 + "tangled.org/core/knotmirror/config" 11 + "tangled.org/core/knotmirror/db" 12 + "tangled.org/core/knotmirror/knotstream" 13 + "tangled.org/core/log" 14 + ) 15 + 16 + func Run(ctx context.Context) error { 17 + // make sure every services are cleaned up on fast return 18 + ctx, cancel := context.WithCancel(ctx) 19 + defer cancel() 20 + 21 + logger := log.FromContext(ctx) 22 + cfg, err := config.Load(ctx) 23 + if err != nil { 24 + return fmt.Errorf("loading config: %w", err) 25 + } 26 + 27 + logger.Debug("config:", "config", cfg) 28 + 29 + db, err := db.Make(ctx, cfg.DbPath) 30 + if err != nil { 31 + return fmt.Errorf("initializing db: %w", err) 32 + } 33 + 34 + knotstream := knotstream.NewKnotStream(logger, db, cfg) 35 + crawler := NewCrawler(logger, db) 36 + resyncer := NewResyncer(logger, db, cfg) 37 + 38 + // maintain repository list with tap 39 + // NOTE: this can be removed once we introduce did-for-repo because then we can just listen to KnotStream for #identity events. 40 + tap := NewTapClient(logger, cfg, db, knotstream) 41 + 42 + // start metrics endpoint 43 + go func() { 44 + metricsAddr := cfg.MetricsListen 45 + logger.Info("starting metrics server", "addr", metricsAddr) 46 + http.Handle("/metrics", promhttp.Handler()) 47 + if err := http.ListenAndServe(metricsAddr, nil); err != nil { 48 + logger.Error("metrics server failed", "error", err) 49 + } 50 + }() 51 + 52 + tap.Start(ctx) 53 + 54 + resyncer.Start(ctx) 55 + 56 + // periodically crawl the entire network to mirror the repositories 57 + crawler.Start(ctx) 58 + 59 + // listen to knotstream (currently we don't have relay for knots, so subscribe every known knots) 60 + knotstream.Start(ctx) 61 + 62 + svcErr := make(chan error, 1) 63 + if err := knotstream.ResubscribeAllHosts(ctx); err != nil { 64 + svcErr <- fmt.Errorf("resubscribing known hosts: %w", err) 65 + } 66 + 67 + logger.Info("startup complete") 68 + select { 69 + case <-ctx.Done(): 70 + logger.Info("received shutdown signal", "reason", ctx.Err()) 71 + case err := <-svcErr: 72 + if err != nil { 73 + logger.Error("service error", "error", err) 74 + } 75 + cancel() 76 + } 77 + 78 + logger.Info("shutting down knotmirror") 79 + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) 80 + defer shutdownCancel() 81 + 82 + var errs []error 83 + if err := knotstream.Shutdown(shutdownCtx); err != nil { 84 + errs = append(errs, err) 85 + } 86 + if err := db.Close(); err != nil { 87 + errs = append(errs, err) 88 + } 89 + for _, err := range errs { 90 + logger.Error("error during shutdown", "err", err) 91 + } 92 + 93 + logger.Info("shutdown complete") 94 + return nil 95 + }
+87
knotmirror/knotstream/knotstream.go
··· 1 + package knotstream 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "fmt" 7 + "log/slog" 8 + "time" 9 + 10 + "tangled.org/core/knotmirror/config" 11 + "tangled.org/core/knotmirror/db" 12 + "tangled.org/core/knotmirror/models" 13 + "tangled.org/core/log" 14 + ) 15 + 16 + type KnotStream struct { 17 + logger *slog.Logger 18 + db *sql.DB 19 + slurper *KnotSlurper 20 + } 21 + 22 + func NewKnotStream(l *slog.Logger, db *sql.DB, cfg *config.Config) *KnotStream { 23 + l = log.SubLogger(l, "knotstream") 24 + return &KnotStream{ 25 + logger: l, 26 + db: db, 27 + slurper: NewKnotSlurper(l, db, cfg.Slurper), 28 + } 29 + } 30 + 31 + func (s *KnotStream) Start(ctx context.Context) { 32 + go s.slurper.Run(ctx) 33 + } 34 + 35 + func (s *KnotStream) Shutdown(ctx context.Context) error { 36 + return s.slurper.Shutdown(ctx) 37 + } 38 + 39 + func (s *KnotStream) CheckIfSubscribed(hostname string) bool { 40 + return s.slurper.CheckIfSubscribed(hostname) 41 + } 42 + 43 + func (s *KnotStream) SubscribeHost(ctx context.Context, hostname string, noSSL bool) error { 44 + s.logger.Debug("subscribe", "nossl", noSSL) 45 + host, err := db.GetHost(ctx, s.db, hostname) 46 + if err != nil { 47 + return fmt.Errorf("loading host from db: %w", err) 48 + } 49 + 50 + if host == nil { 51 + host = &models.Host{ 52 + Hostname: hostname, 53 + NoSSL: noSSL, 54 + Status: models.HostStatusActive, 55 + LastSeq: 0, 56 + } 57 + 58 + if err := db.UpsertHost(ctx, s.db, host); err != nil { 59 + return fmt.Errorf("adding host to db: %w", err) 60 + } 61 + 62 + s.logger.Info("adding new host subscription", "hostname", hostname, "noSSL", noSSL) 63 + } 64 + 65 + if host.Status == models.HostStatusBanned { 66 + return fmt.Errorf("cannot subscribe to banned knot") 67 + } 68 + return s.slurper.Subscribe(ctx, *host) 69 + } 70 + 71 + func (s *KnotStream) ResubscribeAllHosts(ctx context.Context) error { 72 + hosts, err := db.ListHosts(ctx, s.db) 73 + if err != nil { 74 + return fmt.Errorf("listing hosts: %w", err) 75 + } 76 + 77 + for _, host := range hosts { 78 + l := s.logger.With("hostname", host.Hostname) 79 + l.Info("re-subscribing to active host") 80 + if err := s.slurper.Subscribe(ctx, host); err != nil { 81 + l.Warn("failed to re-subscribe to host", "err", err) 82 + } 83 + // sleep for a very short period, so we don't open tons of sockets at the same time 84 + time.Sleep(1 * time.Millisecond) 85 + } 86 + return nil 87 + }
+28
knotmirror/knotstream/metrics.go
··· 1 + package knotstream 2 + 3 + import ( 4 + "github.com/prometheus/client_golang/prometheus" 5 + "github.com/prometheus/client_golang/prometheus/promauto" 6 + ) 7 + 8 + // KnotStream metrics 9 + var ( 10 + knotstreamEventsReceived = promauto.NewCounter(prometheus.CounterOpts{ 11 + Name: "knotmirror_knotstream_events_received_total", 12 + Help: "Total number of events received from knotstream", 13 + }) 14 + knotstreamEventsProcessed = promauto.NewCounter(prometheus.CounterOpts{ 15 + Name: "knotmirror_knotstream_events_processed_total", 16 + Help: "Total number of events successfully processed", 17 + }) 18 + knotstreamEventsSkipped = promauto.NewCounter(prometheus.CounterOpts{ 19 + Name: "knotmirror_knotstream_events_skipped_total", 20 + Help: "Total number of events skipped (not tracked)", 21 + }) 22 + ) 23 + 24 + // slurper metrics 25 + var connectedInbound = promauto.NewGauge(prometheus.GaugeOpts{ 26 + Name: "knotmirror_connected_inbound", 27 + Help: "Number of inbound knotstream we are consuming", 28 + })
+102
knotmirror/knotstream/scheduler.go
··· 1 + package knotstream 2 + 3 + import ( 4 + "context" 5 + "log/slog" 6 + "sync" 7 + "sync/atomic" 8 + "time" 9 + 10 + "tangled.org/core/log" 11 + ) 12 + 13 + type ParallelScheduler struct { 14 + concurrency int 15 + 16 + do func(ctx context.Context, task *Task) error 17 + 18 + feeder chan *Task 19 + lk sync.Mutex 20 + scheduled map[string][]*Task 21 + lastSeq atomic.Int64 22 + 23 + logger *slog.Logger 24 + } 25 + 26 + type Task struct { 27 + key string 28 + message []byte 29 + } 30 + 31 + func NewParallelScheduler(maxC int, ident string, do func(context.Context, *Task) error) *ParallelScheduler { 32 + return &ParallelScheduler{ 33 + concurrency: maxC, 34 + do: do, 35 + feeder: make(chan *Task), 36 + scheduled: make(map[string][]*Task), 37 + logger: log.New("parallel-scheduler"), 38 + } 39 + } 40 + 41 + func (s *ParallelScheduler) Start(ctx context.Context) { 42 + for range s.concurrency { 43 + go s.ForEach(ctx, s.do) 44 + } 45 + } 46 + 47 + func (s *ParallelScheduler) AddTask(ctx context.Context, task *Task) { 48 + s.lk.Lock() 49 + if st, ok := s.scheduled[task.key]; ok { 50 + // schedule task 51 + s.scheduled[task.key] = append(st, task) 52 + s.lk.Unlock() 53 + return 54 + } 55 + s.scheduled[task.key] = []*Task{} 56 + s.lk.Unlock() 57 + 58 + select { 59 + case <-ctx.Done(): 60 + return 61 + case s.feeder <- task: 62 + return 63 + } 64 + } 65 + 66 + func (s *ParallelScheduler) ForEach(ctx context.Context, fn func(context.Context, *Task) error) { 67 + for task := range s.feeder { 68 + for task != nil { 69 + select { 70 + case <-ctx.Done(): 71 + return 72 + default: 73 + } 74 + if err := fn(ctx, task); err != nil { 75 + s.logger.Error("event handler failed", "err", err) 76 + } 77 + 78 + s.lk.Lock() 79 + func() { 80 + rem, ok := s.scheduled[task.key] 81 + if !ok { 82 + s.logger.Error("should always have an 'active' entry if a worker is processing a job") 83 + } 84 + if len(rem) == 0 { 85 + delete(s.scheduled, task.key) 86 + task = nil 87 + } else { 88 + task = rem[0] 89 + s.scheduled[task.key] = rem[1:] 90 + } 91 + 92 + // TODO: update seq from received message 93 + s.lastSeq.Store(time.Now().UnixNano()) 94 + }() 95 + s.lk.Unlock() 96 + } 97 + } 98 + } 99 + 100 + func (s *ParallelScheduler) LastSeq() int64 { 101 + return s.lastSeq.Load() 102 + }
+332
knotmirror/knotstream/slurper.go
··· 1 + package knotstream 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "encoding/json" 7 + "fmt" 8 + "log/slog" 9 + "math/rand" 10 + "net/http" 11 + "sync" 12 + "time" 13 + 14 + "github.com/bluesky-social/indigo/atproto/syntax" 15 + "github.com/bluesky-social/indigo/util/ssrf" 16 + "github.com/carlmjohnson/versioninfo" 17 + "github.com/gorilla/websocket" 18 + "tangled.org/core/api/tangled" 19 + "tangled.org/core/knotmirror/config" 20 + "tangled.org/core/knotmirror/db" 21 + "tangled.org/core/knotmirror/models" 22 + "tangled.org/core/log" 23 + ) 24 + 25 + type KnotSlurper struct { 26 + logger *slog.Logger 27 + db *sql.DB 28 + cfg config.SlurperConfig 29 + 30 + subsLk sync.Mutex 31 + subs map[string]*subscription 32 + } 33 + 34 + func NewKnotSlurper(l *slog.Logger, db *sql.DB, cfg config.SlurperConfig) *KnotSlurper { 35 + return &KnotSlurper{ 36 + logger: log.SubLogger(l, "slurper"), 37 + db: db, 38 + cfg: cfg, 39 + subs: make(map[string]*subscription), 40 + } 41 + } 42 + 43 + func (s *KnotSlurper) Run(ctx context.Context) { 44 + for { 45 + select { 46 + case <-ctx.Done(): 47 + return 48 + case <-time.After(s.cfg.PersistCursorPeriod): 49 + if err := s.persistCursors(ctx); err != nil { 50 + s.logger.Error("failed to flush cursors", "err", err) 51 + } 52 + } 53 + } 54 + } 55 + 56 + func (s *KnotSlurper) CheckIfSubscribed(hostname string) bool { 57 + s.subsLk.Lock() 58 + defer s.subsLk.Unlock() 59 + 60 + _, ok := s.subs[hostname] 61 + return ok 62 + } 63 + 64 + func (s *KnotSlurper) Shutdown(ctx context.Context) error { 65 + s.logger.Info("starting shutdown host cursor flush") 66 + err := s.persistCursors(ctx) 67 + if err != nil { 68 + s.logger.Error("shutdown error", "err", err) 69 + } 70 + s.logger.Info("slurper shutdown complete") 71 + return err 72 + } 73 + 74 + func (s *KnotSlurper) persistCursors(ctx context.Context) error { 75 + // gather cursor list from subscriptions and store them to DB 76 + // start := time.Now() 77 + 78 + s.subsLk.Lock() 79 + cursors := make([]models.HostCursor, len(s.subs)) 80 + i := 0 81 + for _, sub := range s.subs { 82 + cursors[i] = sub.HostCursor() 83 + i++ 84 + } 85 + s.subsLk.Unlock() 86 + 87 + err := db.StoreCursors(ctx, s.db, cursors) 88 + // s.logger.Info("finished persisting cursors", "count", len(cursors), "duration", time.Since(start).String(), "err", err) 89 + return err 90 + } 91 + 92 + func (s *KnotSlurper) Subscribe(ctx context.Context, host models.Host) error { 93 + s.subsLk.Lock() 94 + defer s.subsLk.Unlock() 95 + 96 + _, ok := s.subs[host.Hostname] 97 + if ok { 98 + return fmt.Errorf("already subscribed: %s", host.Hostname) 99 + } 100 + 101 + // TODO: include `cancel` function to kill subscription by hostname 102 + sub := &subscription{ 103 + hostname: host.Hostname, 104 + scheduler: NewParallelScheduler( 105 + s.cfg.ConcurrencyPerHost, 106 + host.Hostname, 107 + s.ProcessEvent, 108 + ), 109 + } 110 + s.subs[host.Hostname] = sub 111 + 112 + sub.scheduler.Start(ctx) 113 + go s.subscribeWithRedialer(ctx, host, sub) 114 + return nil 115 + } 116 + 117 + func (s *KnotSlurper) subscribeWithRedialer(ctx context.Context, host models.Host, sub *subscription) { 118 + l := s.logger.With("host", host.Hostname) 119 + 120 + dialer := websocket.Dialer{ 121 + HandshakeTimeout: time.Second * 5, 122 + } 123 + 124 + // if this isn't a localhost / private connection, then we should enable SSRF protections 125 + if !host.NoSSL { 126 + netDialer := ssrf.PublicOnlyDialer() 127 + dialer.NetDialContext = netDialer.DialContext 128 + } 129 + 130 + cursor := host.LastSeq 131 + 132 + connectedInbound.Inc() 133 + defer connectedInbound.Dec() 134 + 135 + var backoff int 136 + for { 137 + select { 138 + case <-ctx.Done(): 139 + return 140 + default: 141 + } 142 + u := host.LegacyEventsURL(cursor) 143 + l.Debug("made url with cursor", "cursor", cursor, "url", u) 144 + 145 + // NOTE: manual backoff retry implementation to explicitly handle fails 146 + hdr := make(http.Header) 147 + hdr.Add("User-Agent", userAgent()) 148 + conn, resp, err := dialer.DialContext(ctx, u, hdr) 149 + if err != nil { 150 + l.Warn("dialing failed", "err", err, "backoff", backoff) 151 + time.Sleep(sleepForBackoff(backoff)) 152 + backoff++ 153 + if backoff > 15 { 154 + l.Warn("host does not appear to be online, disabling for now") 155 + host.Status = models.HostStatusOffline 156 + if err := db.UpsertHost(ctx, s.db, &host); err != nil { 157 + l.Error("failed to update host status", "err", err) 158 + } 159 + return 160 + } 161 + continue 162 + } 163 + 164 + l.Debug("knot event subscription response", "code", resp.StatusCode, "url", u) 165 + 166 + if err := s.handleConnection(ctx, conn, sub); err != nil { 167 + // TODO: measure the last N connection error times and if they're coming too fast reconnect slower or don't reconnect and wait for requestCrawl 168 + l.Warn("host connection failed", "err", err, "backoff", backoff) 169 + } 170 + 171 + updatedCursor := sub.LastSeq() 172 + didProgress := updatedCursor > cursor 173 + l.Debug("cursor compare", "cursor", cursor, "updatedCursor", updatedCursor, "didProgress", didProgress) 174 + if didProgress { 175 + cursor = updatedCursor 176 + backoff = 0 177 + 178 + batch := []models.HostCursor{sub.HostCursor()} 179 + if err := db.StoreCursors(ctx, s.db, batch); err != nil { 180 + l.Error("failed to store cursors", "err", err) 181 + } 182 + } 183 + } 184 + } 185 + 186 + // handleConnection handles websocket connection. 187 + // Schedules task from received event and return when connection is closed 188 + func (s *KnotSlurper) handleConnection(ctx context.Context, conn *websocket.Conn, sub *subscription) error { 189 + // ping on every 30s 190 + ctx, cancel := context.WithCancel(ctx) 191 + defer cancel() // close the background ping job on connection close 192 + go func() { 193 + t := time.NewTicker(30 * time.Second) 194 + defer t.Stop() 195 + failcount := 0 196 + 197 + for { 198 + select { 199 + case <-t.C: 200 + if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second*10)); err != nil { 201 + s.logger.Warn("failed to ping", "err", err) 202 + failcount++ 203 + if failcount >= 4 { 204 + s.logger.Error("too many ping fails", "count", failcount) 205 + _ = conn.Close() 206 + return 207 + } 208 + } else { 209 + failcount = 0 // ok ping 210 + } 211 + case <-ctx.Done(): 212 + _ = conn.Close() 213 + return 214 + } 215 + } 216 + }() 217 + 218 + conn.SetPingHandler(func(message string) error { 219 + err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Minute)) 220 + if err == websocket.ErrCloseSent { 221 + return nil 222 + } 223 + return err 224 + }) 225 + conn.SetPongHandler(func(_ string) error { 226 + if err := conn.SetReadDeadline(time.Now().Add(time.Minute)); err != nil { 227 + s.logger.Error("failed to set read deadline", "err", err) 228 + } 229 + return nil 230 + }) 231 + 232 + for { 233 + select { 234 + case <-ctx.Done(): 235 + return ctx.Err() 236 + default: 237 + } 238 + msgType, msg, err := conn.ReadMessage() 239 + if err != nil { 240 + return err 241 + } 242 + 243 + if msgType != websocket.TextMessage { 244 + continue 245 + } 246 + 247 + sub.scheduler.AddTask(ctx, &Task{ 248 + key: sub.hostname, // TODO: replace to repository AT-URI for better concurrency 249 + message: msg, 250 + }) 251 + } 252 + } 253 + 254 + type LegacyGitEvent struct { 255 + Rkey string 256 + Nsid string 257 + Event tangled.GitRefUpdate 258 + } 259 + 260 + func (s *KnotSlurper) ProcessEvent(ctx context.Context, task *Task) error { 261 + var legacyMessage LegacyGitEvent 262 + if err := json.Unmarshal(task.message, &legacyMessage); err != nil { 263 + return fmt.Errorf("unmarshaling message: %w", err) 264 + } 265 + 266 + if err := s.ProcessLegacyGitRefUpdate(ctx, &legacyMessage); err != nil { 267 + return fmt.Errorf("processing gitRefUpdate: %w", err) 268 + } 269 + return nil 270 + } 271 + 272 + func (s *KnotSlurper) ProcessLegacyGitRefUpdate(ctx context.Context, evt *LegacyGitEvent) error { 273 + knotstreamEventsReceived.Inc() 274 + 275 + curr, err := db.GetRepoByName(ctx, s.db, syntax.DID(evt.Event.RepoDid), evt.Event.RepoName) 276 + if err != nil { 277 + return fmt.Errorf("failed to get repo '%s': %w", evt.Event.RepoDid+"/"+evt.Event.RepoName, err) 278 + } 279 + if curr == nil { 280 + // if repo doesn't exist in DB, just ignore the event. That repo is unknown. 281 + // 282 + // Normally did+name is already enough to perform git-fetch as that's 283 + // what needed to fetch the repository. 284 + // But we want to store that in did/rkey in knot-mirror. 285 + // Therefore, we should ignore when the repository is unknown. 286 + // Hopefully crawler will sync it later. 287 + s.logger.Warn("skipping event from unknown repo", "did/repo", evt.Event.RepoDid+"/"+evt.Event.RepoName) 288 + knotstreamEventsSkipped.Inc() 289 + return nil 290 + } 291 + l := s.logger.With("repoAt", curr.AtUri()) 292 + 293 + // TODO: should plan resync to resyncBuffer on RepoStateResyncing 294 + if curr.State != models.RepoStateActive { 295 + l.Debug("skipping non-active repo") 296 + knotstreamEventsSkipped.Inc() 297 + return nil 298 + } 299 + 300 + if curr.GitRev != "" && evt.Rkey <= curr.GitRev.String() { 301 + l.Debug("skipping replayed event", "eventRev", evt.Rkey, "currentRev", curr.GitRev) 302 + knotstreamEventsSkipped.Inc() 303 + return nil 304 + } 305 + 306 + // if curr.State == models.RepoStateResyncing { 307 + // firehoseEventsSkipped.Inc() 308 + // return fp.events.addToResyncBuffer(ctx, commit) 309 + // } 310 + 311 + // can't skip anything, update repo state 312 + if err := db.UpdateRepoState(ctx, s.db, curr.Did, curr.Rkey, models.RepoStateDesynchronized); err != nil { 313 + return err 314 + } 315 + 316 + knotstreamEventsProcessed.Inc() 317 + return nil 318 + } 319 + 320 + func userAgent() string { 321 + return fmt.Sprintf("knotmirror/%s", versioninfo.Short()) 322 + } 323 + 324 + func sleepForBackoff(b int) time.Duration { 325 + if b == 0 { 326 + return 0 327 + } 328 + if b < 10 { 329 + return (time.Duration(b) * 2) + (time.Millisecond * time.Duration(rand.Intn(1000))) 330 + } 331 + return time.Second * 30 332 + }
+22
knotmirror/knotstream/subscription.go
··· 1 + package knotstream 2 + 3 + import "tangled.org/core/knotmirror/models" 4 + 5 + // subscription represents websocket connection with that host 6 + type subscription struct { 7 + hostname string 8 + 9 + // embedded parallel job scheduler 10 + scheduler *ParallelScheduler 11 + } 12 + 13 + func (s *subscription) LastSeq() int64 { 14 + return s.scheduler.LastSeq() 15 + } 16 + 17 + func (s *subscription) HostCursor() models.HostCursor { 18 + return models.HostCursor{ 19 + Hostname: s.hostname, 20 + LastSeq: s.LastSeq(), 21 + } 22 + }
+29
knotmirror/metrics.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "github.com/prometheus/client_golang/prometheus" 5 + "github.com/prometheus/client_golang/prometheus/promauto" 6 + ) 7 + 8 + // Resync metrics 9 + var ( 10 + // TODO: 11 + // - working / waiting resycner counts 12 + resyncsStarted = promauto.NewCounter(prometheus.CounterOpts{ 13 + Name: "knotmirror_resyncs_started_total", 14 + Help: "Total number of repo resyncs started", 15 + }) 16 + resyncsCompleted = promauto.NewCounter(prometheus.CounterOpts{ 17 + Name: "knotmirror_resyncs_completed_total", 18 + Help: "Total number of repo resyncs completed", 19 + }) 20 + resyncsFailed = promauto.NewCounter(prometheus.CounterOpts{ 21 + Name: "knotmirror_resyncs_failed_total", 22 + Help: "Total number of repo resyncs failed", 23 + }) 24 + resyncDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 25 + Name: "knotmirror_resync_duration_seconds", 26 + Help: "Duration of repo resync operations", 27 + Buckets: prometheus.ExponentialBuckets(0.1, 2, 12), 28 + }) 29 + )
+88
knotmirror/models/models.go
··· 1 + package models 2 + 3 + import ( 4 + "fmt" 5 + 6 + "github.com/bluesky-social/indigo/atproto/syntax" 7 + "tangled.org/core/api/tangled" 8 + ) 9 + 10 + type Repo struct { 11 + Did syntax.DID 12 + Rkey syntax.RecordKey 13 + Cid *syntax.CID 14 + // content of tangled.Repo 15 + Name string 16 + KnotDomain string 17 + 18 + GitRev syntax.TID // last processed git.refUpdate revision 19 + RepoSha string // sha256 sum of git refs (to avoid no-op git fetch) 20 + State RepoState 21 + ErrorMsg string 22 + RetryCount int 23 + RetryAfter int64 // Unix timestamp (seconds) 24 + } 25 + 26 + func (r *Repo) AtUri() syntax.ATURI { 27 + return syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", r.Did, tangled.RepoNSID, r.Rkey)) 28 + } 29 + 30 + func (r *Repo) DidSlashRepo() string { 31 + return fmt.Sprintf("%s/%s", r.Did, r.Name) 32 + } 33 + 34 + type RepoState string 35 + 36 + const ( 37 + RepoStatePending RepoState = "pending" 38 + RepoStateDesynchronized RepoState = "desynchronized" 39 + RepoStateResyncing RepoState = "resyncing" 40 + RepoStateActive RepoState = "active" 41 + RepoStateError RepoState = "error" 42 + ) 43 + 44 + type HostCursor struct { 45 + Hostname string 46 + LastSeq int64 47 + } 48 + 49 + type Host struct { 50 + Hostname string 51 + NoSSL bool 52 + Status HostStatus 53 + LastSeq int64 54 + } 55 + 56 + type HostStatus string 57 + 58 + const ( 59 + HostStatusActive HostStatus = "active" 60 + HostStatusIdle HostStatus = "idle" 61 + HostStatusOffline HostStatus = "offline" 62 + HostStatusThrottled HostStatus = "throttled" 63 + HostStatusBanned HostStatus = "banned" 64 + ) 65 + 66 + // func (h *Host) SubscribeGitRefsURL(cursor int64) string { 67 + // scheme := "wss" 68 + // if h.NoSSL { 69 + // scheme = "ws" 70 + // } 71 + // u := fmt.Sprintf("%s://%s/xrpc/%s", scheme, h.Hostname, tangled.SubscribeGitRefsNSID) 72 + // if cursor > 0 { 73 + // u = fmt.Sprintf("%s?cursor=%d", u, h.LastSeq) 74 + // } 75 + // return u 76 + // } 77 + 78 + func (h *Host) LegacyEventsURL(cursor int64) string { 79 + scheme := "wss" 80 + if h.NoSSL { 81 + scheme = "ws" 82 + } 83 + u := fmt.Sprintf("%s://%s/events", scheme, h.Hostname) 84 + if cursor > 0 { 85 + u = fmt.Sprintf("%s?cursor=%d", u, h.LastSeq) 86 + } 87 + return u 88 + }
+8
knotmirror/readme.md
··· 1 + # KnotMirror 2 + 3 + Mirror of all known repos. Heavily inspired by [indigo/relay] and [indigo/tap]. 4 + 5 + Knot Mirror syncs repo list using tap and subscribe to all known knots as KnotStream. 6 + 7 + [indigo/relay]: https://github.com/bluesky-social/indigo/tree/main/cmd/relay 8 + [indigo/tap]: https://github.com/bluesky-social/indigo/tree/main/cmd/tap
+242
knotmirror/resyncer.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "errors" 7 + "fmt" 8 + "log/slog" 9 + "math/rand" 10 + "net/url" 11 + "path" 12 + "sync" 13 + "time" 14 + 15 + "github.com/bluesky-social/indigo/atproto/syntax" 16 + "github.com/go-git/go-git/v5" 17 + gitconfig "github.com/go-git/go-git/v5/config" 18 + "tangled.org/core/knotmirror/config" 19 + "tangled.org/core/knotmirror/db" 20 + "tangled.org/core/knotmirror/models" 21 + "tangled.org/core/log" 22 + ) 23 + 24 + type Resyncer struct { 25 + logger *slog.Logger 26 + db *sql.DB 27 + 28 + claimJobMu sync.Mutex 29 + 30 + repoBasePath string 31 + repoFetchTimeout time.Duration 32 + knotUseSSL bool 33 + 34 + parallelism int 35 + } 36 + 37 + func NewResyncer(l *slog.Logger, db *sql.DB, cfg *config.Config) *Resyncer { 38 + return &Resyncer{ 39 + logger: log.SubLogger(l, "resyncer"), 40 + db: db, 41 + repoBasePath: cfg.GitRepoBasePath, 42 + repoFetchTimeout: cfg.GitRepoFetchTimeout, 43 + knotUseSSL: cfg.KnotUseSSL, 44 + parallelism: cfg.ResyncParallelism, 45 + } 46 + } 47 + 48 + func (r *Resyncer) Start(ctx context.Context) { 49 + for i := 0; i < r.parallelism; i++ { 50 + go r.runResyncWorker(ctx, i) 51 + } 52 + } 53 + 54 + func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) { 55 + l := r.logger.With("worker", workerID) 56 + for { 57 + select { 58 + case <-ctx.Done(): 59 + l.Info("resync worker shutting down", "error", ctx.Err()) 60 + return 61 + default: 62 + } 63 + repoAt, found, err := r.claimResyncJob(ctx) 64 + if err != nil { 65 + l.Error("failed to claim resync job", "error", err) 66 + time.Sleep(time.Second) 67 + continue 68 + } 69 + if !found { 70 + time.Sleep(time.Second) 71 + continue 72 + } 73 + l.Info("processing resync", "aturi", repoAt) 74 + if err := r.resyncRepo(ctx, repoAt); err != nil { 75 + l.Error("resync failed", "aturi", repoAt, "error", err) 76 + } 77 + } 78 + } 79 + 80 + func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) { 81 + // use mutex to prevent duplicated jobs 82 + r.claimJobMu.Lock() 83 + defer r.claimJobMu.Unlock() 84 + 85 + var repoAt syntax.ATURI 86 + now := time.Now().Unix() 87 + if err := r.db.QueryRowContext(ctx, 88 + `update repos 89 + set state = ? 90 + where at_uri = ( 91 + select at_uri from repos 92 + where state in (?, ?, ?) 93 + and (retry_after = 0 or retry_after < ?) 94 + limit 1 95 + ) 96 + returning at_uri 97 + `, 98 + models.RepoStateResyncing, 99 + models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError, 100 + now, 101 + ).Scan(&repoAt); err != nil { 102 + if errors.Is(err, sql.ErrNoRows) { 103 + return "", false, nil 104 + } 105 + return "", false, err 106 + } 107 + 108 + return repoAt, true, nil 109 + } 110 + 111 + func (r *Resyncer) resyncRepo(ctx context.Context, repoAt syntax.ATURI) error { 112 + // ctx, span := tracer.Start(ctx, "resyncRepo") 113 + // span.SetAttributes(attribute.String("aturi", repoAt)) 114 + // defer span.End() 115 + 116 + resyncsStarted.Inc() 117 + startTime := time.Now() 118 + 119 + success, err := r.doResync(ctx, repoAt) 120 + if !success { 121 + resyncsFailed.Inc() 122 + resyncDuration.Observe(time.Since(startTime).Seconds()) 123 + return r.handleResyncError(ctx, repoAt, err) 124 + } 125 + 126 + resyncsCompleted.Inc() 127 + resyncDuration.Observe(time.Since(startTime).Seconds()) 128 + return nil 129 + } 130 + 131 + func (r *Resyncer) doResync(ctx context.Context, repoAt syntax.ATURI) (bool, error) { 132 + // ctx, span := tracer.Start(ctx, "doResync") 133 + // span.SetAttributes(attribute.String("aturi", repoAt)) 134 + // defer span.End() 135 + 136 + repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 137 + if err != nil { 138 + return false, fmt.Errorf("failed to get repo: %w", err) 139 + } 140 + 141 + repoPath := r.repoPath(repo) 142 + remoteUrl := r.repoRemoteURL(repo) 143 + l := r.logger.With("repo", repo.DidSlashRepo(), "path", repoPath, "url", remoteUrl) 144 + 145 + // TODO: check if Knot is on backoff list. If so, return (false, nil) 146 + // TODO: use r.repoFetchTimeout on fetch 147 + // TODO: detect rate limit error (http.StatusTooManyRequests) to put Knot in backoff list 148 + gr, err := git.PlainOpen(repoPath) 149 + if errors.Is(err, git.ErrRepositoryNotExists) { 150 + l.Debug("cloning repo") 151 + // if err := exec.Command("git", "clone", "--mirror", remoteUrl, repoPath).Run(); err != nil { 152 + // return false, fmt.Errorf("cloning repo: %w", err) 153 + // } 154 + _, err := git.PlainCloneContext(ctx, repoPath, true, &git.CloneOptions{ 155 + URL: remoteUrl, 156 + Mirror: true, 157 + }) 158 + if err != nil { 159 + return false, fmt.Errorf("cloning repo: %w", err) 160 + } 161 + } else { 162 + if err != nil { 163 + return false, fmt.Errorf("laoding repo: %w", err) 164 + } 165 + l.Debug("fetching repo") 166 + // if err := exec.Command("git", "-C", repoPath, "fetch", "--mirror", remoteUrl).Run(); err != nil { 167 + // return false, fmt.Errorf("fetching repo: %w", err) 168 + // } 169 + if err := gr.FetchContext(ctx, &git.FetchOptions{ 170 + RemoteURL: remoteUrl, 171 + RefSpecs: []gitconfig.RefSpec{gitconfig.RefSpec("+refs/*:refs/*")}, 172 + Force: true, 173 + Prune: true, 174 + }); err != nil { 175 + return false, fmt.Errorf("fetching reppo: %w", err) 176 + } 177 + } 178 + 179 + // repo.GitRev = <processed git.refUpdate revision> 180 + // repo.RepoSha = <sha256 sum of git refs> 181 + repo.State = models.RepoStateActive 182 + repo.ErrorMsg = "" 183 + repo.RetryCount = 0 184 + repo.RetryAfter = 0 185 + if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 186 + return false, fmt.Errorf("updating repo state to active %w", err) 187 + } 188 + return true, nil 189 + } 190 + 191 + func (r *Resyncer) handleResyncError(ctx context.Context, repoAt syntax.ATURI, err error) error { 192 + r.logger.Debug("handleResyncError", "at_uri", repoAt, "err", err) 193 + var state models.RepoState 194 + var errMsg string 195 + if err == nil { 196 + state = models.RepoStateDesynchronized 197 + errMsg = "" 198 + } else { 199 + state = models.RepoStateError 200 + errMsg = err.Error() 201 + } 202 + 203 + repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 204 + if err != nil { 205 + return err 206 + } 207 + 208 + // start a 1 min & go up to 1 hr between retries 209 + retryAfter := time.Now().Add(backoff(repo.RetryCount, 60) * 60) 210 + 211 + repo.State = state 212 + repo.ErrorMsg = errMsg 213 + repo.RetryCount = repo.RetryCount + 1 214 + repo.RetryAfter = retryAfter.Unix() 215 + if dbErr := db.UpsertRepo(ctx, r.db, repo); dbErr != nil { 216 + return dbErr 217 + } 218 + return err 219 + } 220 + 221 + func (r *Resyncer) repoPath(repo *models.Repo) string { 222 + return path.Join(r.repoBasePath, repo.Did.String(), repo.Rkey.String()) 223 + } 224 + 225 + func (r *Resyncer) repoRemoteURL(repo *models.Repo) string { 226 + u, _ := url.Parse(repo.KnotDomain) 227 + if u.Scheme == "" { 228 + if r.knotUseSSL { 229 + u.Scheme = "https" 230 + } else { 231 + u.Scheme = "http" 232 + } 233 + } 234 + u = u.JoinPath(repo.DidSlashRepo()) 235 + return u.String() 236 + } 237 + 238 + func backoff(retries int, max int) time.Duration { 239 + dur := min(1<<retries, max) 240 + jitter := time.Millisecond * time.Duration(rand.Intn(1000)) 241 + return time.Second*time.Duration(dur) + jitter 242 + }
+93
knotmirror/tapclient.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "encoding/json" 7 + "fmt" 8 + "log/slog" 9 + 10 + "tangled.org/core/api/tangled" 11 + "tangled.org/core/knotmirror/config" 12 + "tangled.org/core/knotmirror/db" 13 + "tangled.org/core/knotmirror/knotstream" 14 + "tangled.org/core/knotmirror/models" 15 + "tangled.org/core/log" 16 + "tangled.org/core/tap" 17 + ) 18 + 19 + type Tap struct { 20 + logger *slog.Logger 21 + cfg *config.Config 22 + tap tap.Client 23 + db *sql.DB 24 + ks *knotstream.KnotStream 25 + } 26 + 27 + func NewTapClient(l *slog.Logger, cfg *config.Config, db *sql.DB, ks *knotstream.KnotStream) *Tap { 28 + return &Tap{ 29 + logger: log.SubLogger(l, "tapclient"), 30 + cfg: cfg, 31 + tap: tap.NewClient(cfg.TapUrl, ""), 32 + db: db, 33 + ks: ks, 34 + } 35 + } 36 + 37 + func (t *Tap) Start(ctx context.Context) { 38 + go t.tap.Connect(ctx, &tap.SimpleIndexer{ 39 + EventHandler: t.processEvent, 40 + }) 41 + } 42 + 43 + func (t *Tap) processEvent(ctx context.Context, evt tap.Event) error { 44 + l := t.logger.With("component", "tapIndexer") 45 + 46 + var err error 47 + switch evt.Type { 48 + case tap.EvtRecord: 49 + switch evt.Record.Collection.String() { 50 + case tangled.RepoNSID: 51 + err = t.processRepo(ctx, evt.Record) 52 + } 53 + } 54 + 55 + if err != nil { 56 + l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err) 57 + return err 58 + } 59 + return nil 60 + } 61 + 62 + func (t *Tap) processRepo(ctx context.Context, evt *tap.RecordEventData) error { 63 + switch evt.Action { 64 + case tap.RecordCreateAction, tap.RecordUpdateAction: 65 + record := tangled.Repo{} 66 + if err := json.Unmarshal(evt.Record, &record); err != nil { 67 + return fmt.Errorf("parsing record: %w", err) 68 + } 69 + 70 + if err := db.UpsertRepo(ctx, t.db, &models.Repo{ 71 + Did: evt.Did, 72 + Rkey: evt.Rkey, 73 + Cid: evt.CID, 74 + Name: record.Name, 75 + KnotDomain: record.Knot, 76 + State: models.RepoStatePending, 77 + }); err != nil { 78 + return fmt.Errorf("upserting repo to db: %w", err) 79 + } 80 + 81 + if !t.ks.CheckIfSubscribed(record.Knot) { 82 + if err := t.ks.SubscribeHost(ctx, record.Knot, !t.cfg.KnotUseSSL); err != nil { 83 + return fmt.Errorf("subscribing to knot: %w", err) 84 + } 85 + } 86 + 87 + case tap.RecordDeleteAction: 88 + if err := db.DeleteRepo(ctx, t.db, evt.Did, evt.Rkey); err != nil { 89 + return fmt.Errorf("deleting repo from db: %w", err) 90 + } 91 + } 92 + return nil 93 + }
+17 -14
nix/gomod2nix.toml
··· 470 470 version = "v1.5.5" 471 471 hash = "sha256-ouhfDUCXsfpcgaCLfJE9oYprAQHuV61OJzb/aEhT0j8=" 472 472 [mod."github.com/prometheus/client_golang"] 473 - version = "v1.22.0" 474 - hash = "sha256-OJ/9rlWG1DIPQJAZUTzjykkX0o+f+4IKLvW8YityaMQ=" 473 + version = "v1.23.2" 474 + hash = "sha256-3GD4fBFa1tJu8MS4TNP6r2re2eViUE+kWUaieIOQXCg=" 475 475 [mod."github.com/prometheus/client_model"] 476 476 version = "v0.6.2" 477 477 hash = "sha256-q6Fh6v8iNJN9ypD47LjWmx66YITa3FyRjZMRsuRTFeQ=" 478 478 [mod."github.com/prometheus/common"] 479 - version = "v0.64.0" 480 - hash = "sha256-uy3KO60F2Cvhamz3fWQALGSsy13JiTk3NfpXgRuwqtI=" 479 + version = "v0.66.1" 480 + hash = "sha256-bqHPaV9IV70itx63wqwgy2PtxMN0sn5ThVxDmiD7+Tk=" 481 481 [mod."github.com/prometheus/procfs"] 482 482 version = "v0.16.1" 483 483 hash = "sha256-OBCvKlLW2obct35p0L9Q+1ZrxZjpTmbgHMP2rng9hpo=" ··· 510 510 version = "v0.0.0-20220730225603-2ab79fcdd4ef" 511 511 hash = "sha256-/XmSE/J+f6FLWXGvljh6uBK71uoCAK3h82XQEQ1Ki68=" 512 512 [mod."github.com/stretchr/testify"] 513 - version = "v1.10.0" 514 - hash = "sha256-fJ4gnPr0vnrOhjQYQwJ3ARDKPsOtA7d4olQmQWR+wpI=" 513 + version = "v1.11.1" 514 + hash = "sha256-sWfjkuKJyDllDEtnM8sb/pdLzPQmUYWYtmeWz/5suUc=" 515 515 [mod."github.com/urfave/cli/v3"] 516 516 version = "v3.3.3" 517 517 hash = "sha256-FdPiu7koY1qBinkfca4A05zCrX+Vu4eRz8wlRDZJyGg=" ··· 581 581 [mod."go.uber.org/zap"] 582 582 version = "v1.27.0" 583 583 hash = "sha256-8655KDrulc4Das3VRduO9MjCn8ZYD5WkULjCvruaYsU=" 584 + [mod."go.yaml.in/yaml/v2"] 585 + version = "v2.4.2" 586 + hash = "sha256-oC8RWdf1zbMYCtmR0ATy/kCkhIwPR9UqFZSMOKLVF/A=" 584 587 [mod."golang.org/x/crypto"] 585 - version = "v0.40.0" 586 - hash = "sha256-I6p2fqvz63P9MwAuoQrljI7IUbfZQvCem0ii4Q2zZng=" 588 + version = "v0.41.0" 589 + hash = "sha256-o5Di0lsFmYnXl7a5MBTqmN9vXMCRpE9ay71C1Ar8jEY=" 587 590 [mod."golang.org/x/exp"] 588 591 version = "v0.0.0-20250620022241-b7579e27df2b" 589 592 hash = "sha256-IsDTeuWLj4UkPO4NhWTvFeZ22WNtlxjoWiyAJh6zdig=" ··· 591 594 version = "v0.31.0" 592 595 hash = "sha256-ZFTlu9+4QToPPLA8C5UcG2eq/lQylq81RoG/WtYo9rg=" 593 596 [mod."golang.org/x/net"] 594 - version = "v0.42.0" 595 - hash = "sha256-YxileisIIez+kcAI+21kY5yk0iRuEqti2YdmS8jvP2s=" 597 + version = "v0.43.0" 598 + hash = "sha256-bf3iQFrsC8BoarVaS0uSspEFAcr1zHp1uziTtBpwV34=" 596 599 [mod."golang.org/x/sync"] 597 600 version = "v0.17.0" 598 601 hash = "sha256-M85lz4hK3/fzmcUViAp/CowHSxnr3BHSO7pjHp1O6i0=" 599 602 [mod."golang.org/x/sys"] 600 - version = "v0.34.0" 601 - hash = "sha256-5rZ7p8IaGli5X1sJbfIKOcOEwY4c0yQhinJPh2EtK50=" 603 + version = "v0.35.0" 604 + hash = "sha256-ZKM8pesQE6NAFZeKQ84oPn5JMhGr8g4TSwLYAsHMGSI=" 602 605 [mod."golang.org/x/text"] 603 606 version = "v0.29.0" 604 607 hash = "sha256-2cWBtJje+Yc+AnSgCANqBlIwnOMZEGkpQ2cFI45VfLI=" ··· 618 621 version = "v1.73.0" 619 622 hash = "sha256-LfVlwip++q2DX70RU6CxoXglx1+r5l48DwlFD05G11c=" 620 623 [mod."google.golang.org/protobuf"] 621 - version = "v1.36.6" 622 - hash = "sha256-lT5qnefI5FDJnowz9PEkAGylH3+fE+A3DJDkAyy9RMc=" 624 + version = "v1.36.8" 625 + hash = "sha256-yZN8ZON0b5HjUNUSubHst7zbvnMsOzd81tDPYQRtPgM=" 623 626 [mod."gopkg.in/fsnotify.v1"] 624 627 version = "v1.4.7" 625 628 hash = "sha256-j/Ts92oXa3k1MFU7Yd8/AqafRTsFn7V2pDKCyDJLah8="
+20
nix/pkgs/knot-mirror.nix
··· 1 + { 2 + buildGoApplication, 3 + modules, 4 + sqlite-lib, 5 + src, 6 + }: 7 + buildGoApplication { 8 + pname = "knotmirror"; 9 + version = "0.1.0"; 10 + inherit src modules; 11 + 12 + doCheck = false; 13 + 14 + subPackages = ["cmd/knotmirror"]; 15 + tags = ["libsqlite3"]; 16 + 17 + env.CGO_CFLAGS = "-I ${sqlite-lib}/include "; 18 + env.CGO_LDFLAGS = "-L ${sqlite-lib}/lib"; 19 + CGO_ENABLED = 1; 20 + }
+4
tap/tap.go
··· 135 135 return err 136 136 } 137 137 defer conn.Close() 138 + defer func() { 139 + l.Warn("closed tap conection") 140 + }() 141 + l.Info("established tap conection") 138 142 139 143 for { 140 144 select {