Monorepo for Tangled

knotmirror: switch to postgres for concurrent writes

Signed-off-by: Seongmin Lee <git@boltless.me>

boltless.me a81412ef b0daf11e

verified
+100 -91
+4
go.mod
··· 33 33 github.com/hiddeco/sshsig v0.2.0 34 34 github.com/hpcloud/tail v1.0.0 35 35 github.com/ipfs/go-cid v0.5.0 36 + github.com/jackc/pgx/v5 v5.8.0 36 37 github.com/mattn/go-sqlite3 v1.14.24 37 38 github.com/microcosm-cc/bluemonday v1.0.27 38 39 github.com/openbao/openbao/api/v2 v2.3.0 ··· 147 148 github.com/ipfs/go-log v1.0.5 // indirect 148 149 github.com/ipfs/go-log/v2 v2.6.0 // indirect 149 150 github.com/ipfs/go-metrics-interface v0.3.0 // indirect 151 + github.com/jackc/pgpassfile v1.0.0 // indirect 152 + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect 153 + github.com/jackc/puddle/v2 v2.2.2 // indirect 150 154 github.com/json-iterator/go v1.1.12 // indirect 151 155 github.com/kevinburke/ssh_config v1.2.0 // indirect 152 156 github.com/klauspost/compress v1.18.0 // indirect
+8
go.sum
··· 309 309 github.com/ipfs/go-log/v2 v2.6.0/go.mod h1:p+Efr3qaY5YXpx9TX7MoLCSEZX5boSWj9wh86P5HJa8= 310 310 github.com/ipfs/go-metrics-interface v0.3.0 h1:YwG7/Cy4R94mYDUuwsBfeziJCVm9pBMJ6q/JR9V40TU= 311 311 github.com/ipfs/go-metrics-interface v0.3.0/go.mod h1:OxxQjZDGocXVdyTPocns6cOLwHieqej/jos7H4POwoY= 312 + github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= 313 + github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= 314 + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= 315 + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= 316 + github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo= 317 + github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw= 318 + github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= 319 + github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= 312 320 github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= 313 321 github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= 314 322 github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
+7 -25
knotmirror/adminpage.go
··· 13 13 "tangled.org/core/appview/pagination" 14 14 "tangled.org/core/knotmirror/db" 15 15 "tangled.org/core/knotmirror/models" 16 - "tangled.org/core/orm" 17 16 ) 18 17 19 18 //go:embed templates/*.html ··· 50 49 } 51 50 52 51 func (s *AdminServer) handleRepos() http.HandlerFunc { 53 - // TODO: prepare template 54 52 tpl := template.Must(template.New("").Funcs(funcmap()).ParseFS(templateFS, "templates/base.html", "templates/repos.html")) 55 53 return func(w http.ResponseWriter, r *http.Request) { 56 54 pageNum, _ := strconv.Atoi(r.URL.Query().Get("page")) 57 55 if pageNum < 1 { 58 56 pageNum = 1 59 57 } 60 - var ( 61 - did = r.URL.Query().Get("did") 62 - knot = r.URL.Query().Get("knot") 63 - state = r.URL.Query().Get("state") 64 - ) 65 - 66 58 page := pagination.Page{ 67 59 Offset: (pageNum - 1) * repoPageSize, 68 60 Limit: repoPageSize, 69 61 } 70 - var filters []orm.Filter 71 62 72 - if did != "" { 73 - filters = append(filters, orm.FilterEq("did", did)) 74 - } 75 - if knot != "" { 76 - filters = append(filters, orm.FilterEq("knot_domain", knot)) 77 - } 78 - if state != "" { 79 - filters = append(filters, orm.FilterEq("state", state)) 80 - } 63 + var ( 64 + did = r.URL.Query().Get("did") 65 + knot = r.URL.Query().Get("knot") 66 + state = r.URL.Query().Get("state") 67 + ) 81 68 82 - repos, err := db.ListRepos(r.Context(), s.db, page, filters...) 69 + repos, err := db.ListRepos(r.Context(), s.db, page, did, knot, state) 83 70 if err != nil { 84 71 http.Error(w, err.Error(), http.StatusInternalServerError) 85 72 } ··· 106 93 return func(w http.ResponseWriter, r *http.Request) { 107 94 var status = r.URL.Query().Get("status") 108 95 109 - var filters []orm.Filter 110 - if status != "" { 111 - filters = append(filters, orm.FilterEq("status", status)) 112 - } 113 - 114 - hosts, err := db.ListHosts(r.Context(), s.db, filters...) 96 + hosts, err := db.ListHosts(r.Context(), s.db, models.HostStatus(status)) 115 97 if err != nil { 116 98 http.Error(w, err.Error(), http.StatusInternalServerError) 117 99 }
+22 -15
knotmirror/db/db.go
··· 4 4 "context" 5 5 "database/sql" 6 6 "fmt" 7 - "strings" 7 + "time" 8 + 8 9 _ "github.com/mattn/go-sqlite3" 10 + _ "github.com/jackc/pgx/v5/stdlib" 9 11 ) 10 12 11 - func Make(ctx context.Context, dbPath string) (*sql.DB, error) { 12 - // https://github.com/mattn/go-sqlite3#connection-string 13 - opts := []string{ 14 - "_foreign_keys=1", 15 - "_journal_mode=WAL", 16 - "_synchronous=NORMAL", 17 - "_auto_vacuum=incremental", 13 + func Make(ctx context.Context, dbUrl string, maxConns int) (*sql.DB, error) { 14 + db, err := sql.Open("pgx", dbUrl) 15 + if err != nil { 16 + return nil, fmt.Errorf("opening db: %w", err) 18 17 } 19 18 20 - db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&")) 21 - if err != nil { 22 - return nil, err 19 + db.SetMaxOpenConns(maxConns) 20 + db.SetMaxIdleConns(maxConns) 21 + db.SetConnMaxIdleTime(time.Hour) 22 + 23 + pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second) 24 + defer cancel() 25 + if err := db.PingContext(pingCtx); err != nil { 26 + db.Close() 27 + return nil, fmt.Errorf("ping db: %w", err) 23 28 } 24 29 25 30 conn, err := db.Conn(ctx) ··· 47 52 retry_count integer not null default 0, 48 53 retry_after integer not null default 0, 49 54 50 - unique(did, rkey) 55 + constraint repos_pkey primary key (did, rkey) 51 56 ); 52 57 53 58 -- knot hosts 54 59 create table if not exists hosts ( 55 60 hostname text not null, 56 - no_ssl integer not null default 0, 61 + no_ssl boolean not null default false, 57 62 status text not null default 'active', 58 - last_seq integer not null default -1, 63 + last_seq bigint not null default -1, 59 64 60 - unique(hostname) 65 + constraint hosts_pkey primary key (hostname) 61 66 ); 67 + 68 + create index if not exists idx_repos_aturi on repos (at_uri); 62 69 `) 63 70 if err != nil { 64 71 return nil, fmt.Errorf("initializing db schema: %w", err)
+8 -22
knotmirror/db/hosts.go
··· 6 6 "errors" 7 7 "fmt" 8 8 "log" 9 - "strings" 10 9 11 10 "tangled.org/core/knotmirror/models" 12 - "tangled.org/core/orm" 13 11 ) 14 12 15 13 func UpsertHost(ctx context.Context, e *sql.DB, host *models.Host) error { 16 14 if _, err := e.ExecContext(ctx, 17 15 `insert into hosts (hostname, no_ssl, status, last_seq) 18 - values (?, ?, ?, ?) 16 + values ($1, $2, $3, $4) 19 17 on conflict(hostname) do update set 20 18 no_ssl = excluded.no_ssl, 21 19 status = excluded.status, ··· 35 33 var host models.Host 36 34 if err := e.QueryRowContext(ctx, 37 35 `select hostname, no_ssl, status, last_seq 38 - from hosts where hostname = ?`, 36 + from hosts where hostname = $1`, 39 37 hostname, 40 38 ).Scan( 41 39 &host.Hostname, ··· 62 60 continue 63 61 } 64 62 if _, err := tx.ExecContext(ctx, 65 - `update hosts set last_seq = ? where hostname = ?`, 63 + `update hosts set last_seq = $1 where hostname = $2`, 66 64 cur.LastSeq, 67 65 cur.Hostname, 68 66 ); err != nil { 69 - log.Println("failed to persist host cursor", "host:", cur.Hostname, "lastSeq", cur.LastSeq) 67 + log.Println("failed to persist host cursor", "host", cur.Hostname, "lastSeq", cur.LastSeq, "err", err) 70 68 } 71 69 } 72 70 return tx.Commit() 73 71 } 74 72 75 - func ListHosts(ctx context.Context, e *sql.DB, filters ...orm.Filter) ([]models.Host, error) { 76 - var conditions []string 77 - var args []any 78 - 79 - for _, filter := range filters { 80 - conditions = append(conditions, filter.Condition()) 81 - args = append(args, filter.Arg()...) 82 - } 83 - 84 - whereClause := "" 85 - if len(conditions) > 0 { 86 - whereClause = " where " + strings.Join(conditions, " and ") 87 - } 88 - 73 + func ListHosts(ctx context.Context, e *sql.DB, status models.HostStatus) ([]models.Host, error) { 89 74 rows, err := e.QueryContext(ctx, 90 75 `select hostname, no_ssl, status, last_seq 91 - from hosts` + whereClause, 92 - args..., 76 + from hosts 77 + where status = $1`, 78 + status, 93 79 ) 94 80 if err != nil { 95 81 return nil, fmt.Errorf("querying hosts: %w", err)
+24 -17
knotmirror/db/repos.go
··· 9 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 10 "tangled.org/core/appview/pagination" 11 11 "tangled.org/core/knotmirror/models" 12 - "tangled.org/core/orm" 13 12 ) 14 13 15 14 func AddRepo(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, cid syntax.CID, name, knot string) error { 16 15 if _, err := e.ExecContext(ctx, 17 16 `insert into repos (did, rkey, cid, name, knot_domain) 18 - values (?, ?, ?, ?, ?)`, 17 + values ($1, $2, $3, $4, $5)`, 19 18 did, rkey, cid, name, knot, 20 19 ); err != nil { 21 20 return fmt.Errorf("inserting repo: %w", err) ··· 26 25 func UpsertRepo(ctx context.Context, e *sql.DB, repo *models.Repo) error { 27 26 if _, err := e.ExecContext(ctx, 28 27 `insert into repos (did, rkey, cid, name, knot_domain, git_rev, repo_sha, state, error_msg, retry_count, retry_after) 29 - values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 28 + values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) 30 29 on conflict(did, rkey) do update set 31 30 cid = excluded.cid, 32 31 name = excluded.name, ··· 58 57 func UpdateRepoState(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, state models.RepoState) error { 59 58 if _, err := e.ExecContext(ctx, 60 59 `update repos 61 - set state = ? 62 - where did = ? and rkey = ?`, 60 + set state = $1 61 + where did = $2 and rkey = $3`, 63 62 state, 64 63 did, rkey, 65 64 ); err != nil { ··· 70 69 71 70 func DeleteRepo(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey) error { 72 71 if _, err := e.ExecContext(ctx, 73 - `delete from repos where did = ? and rkey = ?`, 72 + `delete from repos where did = $1 and rkey = $2`, 74 73 did, 75 74 rkey, 76 75 ); err != nil { ··· 95 94 retry_count, 96 95 retry_after 97 96 from repos 98 - where did = ? and name = ?`, 97 + where did = $1 and name = $2`, 99 98 did, 100 99 name, 101 100 ).Scan( ··· 135 134 retry_count, 136 135 retry_after 137 136 from repos 138 - where at_uri = ?`, 137 + where at_uri = $1`, 139 138 aturi, 140 139 ).Scan( 141 140 &repo.Did, ··· 158 157 return &repo, nil 159 158 } 160 159 161 - func ListRepos(ctx context.Context, e *sql.DB, page pagination.Page, filters ...orm.Filter) ([]models.Repo, error) { 160 + func ListRepos(ctx context.Context, e *sql.DB, page pagination.Page, did, knot, state string) ([]models.Repo, error) { 162 161 var conditions []string 163 162 var args []any 164 163 165 - for _, filter := range filters { 166 - conditions = append(conditions, filter.Condition()) 167 - args = append(args, filter.Arg()...) 164 + pageClause := "" 165 + if page.Limit > 0 { 166 + pageClause = " limit $1 offset $2 " 167 + args = append(args, page.Limit, page.Offset) 168 168 } 169 169 170 170 whereClause := "" 171 + if did != "" { 172 + conditions = append(conditions, fmt.Sprintf("did = $%d", len(args)+1)) 173 + args = append(args, did) 174 + } 175 + if knot != "" { 176 + conditions = append(conditions, fmt.Sprintf("knot_domain = $%d", len(args)+1)) 177 + args = append(args, knot) 178 + } 179 + if state != "" { 180 + conditions = append(conditions, fmt.Sprintf("state = $%d", len(args)+1)) 181 + args = append(args, state) 182 + } 171 183 if len(conditions) > 0 { 172 184 whereClause = "WHERE " + conditions[0] 173 185 for _, condition := range conditions[1:] { 174 186 whereClause += " AND " + condition 175 187 } 176 - } 177 - pageClause := "" 178 - if page.Limit > 0 { 179 - pageClause = " limit ? offset ? " 180 - args = append(args, page.Limit, page.Offset) 181 188 } 182 189 183 190 query := `
+2 -2
knotmirror/knotmirror.go
··· 28 28 29 29 logger.Debug("config loaded:", "config", cfg) 30 30 31 - db, err := db.Make(ctx, cfg.DbPath) 31 + db, err := db.Make(ctx, cfg.DbPath, 32) 32 32 if err != nil { 33 33 return fmt.Errorf("initializing db: %w", err) 34 34 } 35 35 36 36 res, err := db.ExecContext(ctx, 37 - `update repos set state = ? where state = ?`, 37 + `update repos set state = $1 where state = $2`, 38 38 models.RepoStateDesynchronized, 39 39 models.RepoStateResyncing, 40 40 )
+1 -2
knotmirror/knotstream/knotstream.go
··· 11 11 "tangled.org/core/knotmirror/db" 12 12 "tangled.org/core/knotmirror/models" 13 13 "tangled.org/core/log" 14 - "tangled.org/core/orm" 15 14 ) 16 15 17 16 type KnotStream struct { ··· 71 70 } 72 71 73 72 func (s *KnotStream) ResubscribeAllHosts(ctx context.Context) error { 74 - hosts, err := db.ListHosts(ctx, s.db, orm.FilterEq("status", "active")) 73 + hosts, err := db.ListHosts(ctx, s.db, models.HostStatusActive) 75 74 if err != nil { 76 75 return fmt.Errorf("listing hosts: %w", err) 77 76 }
+8 -4
knotmirror/resyncer.go
··· 10 10 "net/url" 11 11 "os" 12 12 "path" 13 + "strings" 13 14 "sync" 14 15 "time" 15 16 ··· 87 88 now := time.Now().Unix() 88 89 if err := r.db.QueryRowContext(ctx, 89 90 `update repos 90 - set state = ? 91 + set state = $1 91 92 where at_uri = ( 92 93 select at_uri from repos 93 - where state in (?, ?, ?) 94 - and (retry_after = 0 or retry_after < ?) 94 + where state in ($2, $3, $4) 95 + and (retry_after = 0 or retry_after < $5) 95 96 limit 1 96 97 ) 97 98 returning at_uri ··· 218 219 retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix() 219 220 } 220 221 222 + // remove null bytes 223 + errMsg = strings.ReplaceAll(errMsg, "\x00", "") 224 + 221 225 repo.State = state 222 226 repo.ErrorMsg = errMsg 223 227 repo.RetryCount = retryCount 224 228 repo.RetryAfter = retryAfter 225 229 if dbErr := db.UpsertRepo(ctx, r.db, repo); dbErr != nil { 226 - return dbErr 230 + return fmt.Errorf("failed to update repo state: %w", err) 227 231 } 228 232 return err 229 233 }
+12
nix/gomod2nix.toml
··· 361 361 [mod."github.com/ipfs/go-metrics-interface"] 362 362 version = "v0.3.0" 363 363 hash = "sha256-b3tp3jxecLmJEGx2kW7MiKGlAKPEWg/LJ7hXylSC8jQ=" 364 + [mod."github.com/jackc/pgpassfile"] 365 + version = "v1.0.0" 366 + hash = "sha256-H0nFbC34/3pZUFnuiQk9W7yvAMh6qJDrqvHp+akBPLM=" 367 + [mod."github.com/jackc/pgservicefile"] 368 + version = "v0.0.0-20240606120523-5a60cdf6a761" 369 + hash = "sha256-ETpGsLAA2wcm5xJBayr/mZrCE1YsWbnkbSSX3ptrFn0=" 370 + [mod."github.com/jackc/pgx/v5"] 371 + version = "v5.8.0" 372 + hash = "sha256-Mq5/A/Obcceu6kKxUv30DPC2ZaVvD8Iq/YtmLm1BVec=" 373 + [mod."github.com/jackc/puddle/v2"] 374 + version = "v2.2.2" 375 + hash = "sha256-IUxdu4JYfsCh/qlz2SiUWu7EVPHhyooiVA4oaS2Z6yk=" 364 376 [mod."github.com/json-iterator/go"] 365 377 version = "v1.1.12" 366 378 hash = "sha256-To8A0h+lbfZ/6zM+2PpRpY3+L6725OPC66lffq6fUoM="
+4 -4
nix/pkgs/knot-mirror.nix
··· 1 1 { 2 2 buildGoApplication, 3 3 modules, 4 - sqlite-lib, 4 + # sqlite-lib, 5 5 src, 6 6 }: 7 7 buildGoApplication { ··· 12 12 doCheck = false; 13 13 14 14 subPackages = ["cmd/knotmirror"]; 15 - tags = ["libsqlite3"]; 15 + # tags = ["libsqlite3"]; 16 16 17 - env.CGO_CFLAGS = "-I ${sqlite-lib}/include "; 18 - env.CGO_LDFLAGS = "-L ${sqlite-lib}/lib"; 17 + # env.CGO_CFLAGS = "-I ${sqlite-lib}/include "; 18 + # env.CGO_LDFLAGS = "-L ${sqlite-lib}/lib"; 19 19 CGO_ENABLED = 1; 20 20 }