Monorepo for Tangled tangled.org

appview: add basic issue indexer (wip) #494

merged opened by boltless.me targeting master from boltless.me/core: feat/search
  • Heavily inspired by gitea
  • add GetAllIssues which only receives a paginator and gathers all issues ignoring repoAt field

Signed-off-by: Seongmin Lee boltlessengineer@proton.me

Labels

None yet.

assignee

None yet.

Participants 2
AT URI
at://did:plc:xasnlahkri4ewmbuzly2rlc5/sh.tangled.repo.pull/3lwecnvoz4z22
+391
Diff #5
+1
.gitignore
··· 15 .env 16 *.rdb 17 .envrc 18 # Created if following hacking.md 19 genjwks.out 20 /nix/vm-data
··· 15 .env 16 *.rdb 17 .envrc 18 + *.bleve 19 # Created if following hacking.md 20 genjwks.out 21 /nix/vm-data
+20
appview/indexer/base36/base36.go
···
··· 1 + // mostly copied from gitea/modules/indexer/internal/base32 2 + 3 + package base36 4 + 5 + import ( 6 + "fmt" 7 + "strconv" 8 + ) 9 + 10 + func Encode(i int64) string { 11 + return strconv.FormatInt(i, 36) 12 + } 13 + 14 + func Decode(s string) (int64, error) { 15 + i, err := strconv.ParseInt(s, 36, 64) 16 + if err != nil { 17 + return 0, fmt.Errorf("invalid base36 integer %q: %w", s, err) 18 + } 19 + return i, nil 20 + }
+58
appview/indexer/bleve/batch.go
···
··· 1 + // Copyright 2021 The Gitea Authors. All rights reserved. 2 + // SPDX-License-Identifier: MIT 3 + 4 + package bleveutil 5 + 6 + import ( 7 + "github.com/blevesearch/bleve/v2" 8 + ) 9 + 10 + // FlushingBatch is a batch of operations that automatically flushes to the 11 + // underlying index once it reaches a certain size. 12 + type FlushingBatch struct { 13 + maxBatchSize int 14 + batch *bleve.Batch 15 + index bleve.Index 16 + } 17 + 18 + // NewFlushingBatch creates a new flushing batch for the specified index. Once 19 + // the number of operations in the batch reaches the specified limit, the batch 20 + // automatically flushes its operations to the index. 21 + func NewFlushingBatch(index bleve.Index, maxBatchSize int) *FlushingBatch { 22 + return &FlushingBatch{ 23 + maxBatchSize: maxBatchSize, 24 + batch: index.NewBatch(), 25 + index: index, 26 + } 27 + } 28 + 29 + // Index add a new index to batch 30 + func (b *FlushingBatch) Index(id string, data any) error { 31 + if err := b.batch.Index(id, data); err != nil { 32 + return err 33 + } 34 + return b.flushIfFull() 35 + } 36 + 37 + // Delete add a delete index to batch 38 + func (b *FlushingBatch) Delete(id string) error { 39 + b.batch.Delete(id) 40 + return b.flushIfFull() 41 + } 42 + 43 + func (b *FlushingBatch) flushIfFull() error { 44 + if b.batch.Size() < b.maxBatchSize { 45 + return nil 46 + } 47 + return b.Flush() 48 + } 49 + 50 + // Flush submit the batch and create a new one 51 + func (b *FlushingBatch) Flush() error { 52 + err := b.index.Batch(b.batch) 53 + if err != nil { 54 + return err 55 + } 56 + b.batch = b.index.NewBatch() 57 + return nil 58 + }
+32
appview/indexer/indexer.go
···
··· 1 + package indexer 2 + 3 + import ( 4 + "context" 5 + "log/slog" 6 + 7 + "tangled.org/core/appview/db" 8 + issues_indexer "tangled.org/core/appview/indexer/issues" 9 + "tangled.org/core/appview/notify" 10 + tlog "tangled.org/core/log" 11 + ) 12 + 13 + type Indexer struct { 14 + Issues *issues_indexer.Indexer 15 + logger *slog.Logger 16 + notify.BaseNotifier 17 + } 18 + 19 + func New(logger *slog.Logger) *Indexer { 20 + return &Indexer { 21 + issues_indexer.NewIndexer("indexes.bleve"), 22 + logger, 23 + notify.BaseNotifier{}, 24 + } 25 + } 26 + 27 + // Init initializes all indexers 28 + func (ix *Indexer) Init(ctx context.Context, db *db.DB) error { 29 + ctx = tlog.IntoContext(ctx, ix.logger) 30 + ix.Issues.Init(ctx, db) 31 + return nil 32 + }
+197
appview/indexer/issues/indexer.go
···
··· 1 + // heavily inspired by gitea's model (basically copy-pasted) 2 + package issues_indexer 3 + 4 + import ( 5 + "context" 6 + "errors" 7 + "os" 8 + 9 + "github.com/blevesearch/bleve/v2" 10 + "github.com/blevesearch/bleve/v2/index/upsidedown" 11 + "github.com/blevesearch/bleve/v2/search/query" 12 + "tangled.org/core/appview/db" 13 + "tangled.org/core/appview/indexer/base36" 14 + "tangled.org/core/appview/indexer/bleve" 15 + "tangled.org/core/appview/models" 16 + "tangled.org/core/appview/pagination" 17 + tlog "tangled.org/core/log" 18 + ) 19 + 20 + type Indexer struct { 21 + indexer bleve.Index 22 + path string 23 + } 24 + 25 + func NewIndexer(indexDir string) *Indexer { 26 + return &Indexer{ 27 + path: indexDir, 28 + } 29 + } 30 + 31 + // Init initializes the indexer 32 + func (ix *Indexer) Init(ctx context.Context, e db.Execer) { 33 + l := tlog.FromContext(ctx) 34 + existed, err := ix.intialize(ctx) 35 + if err != nil { 36 + l.Error("failed to initialize issue indexer", "err", err) 37 + } 38 + if !existed { 39 + l.Debug("Populating the issue indexer") 40 + err := PopulateIndexer(ctx, ix, e) 41 + if err != nil { 42 + l.Error("failed to populate issue indexer", "err", err) 43 + } 44 + } 45 + l.Info("Initialized the issue indexer") 46 + } 47 + 48 + func (ix *Indexer) intialize(ctx context.Context) (bool, error) { 49 + if ix.indexer != nil { 50 + return false, errors.New("indexer is already initialized") 51 + } 52 + 53 + indexer, err := openIndexer(ctx, ix.path) 54 + if err != nil { 55 + return false, err 56 + } 57 + if indexer != nil { 58 + ix.indexer = indexer 59 + return true, nil 60 + } 61 + 62 + mapping := bleve.NewIndexMapping() 63 + indexer, err = bleve.New(ix.path, mapping) 64 + if err != nil { 65 + return false, err 66 + } 67 + 68 + ix.indexer = indexer 69 + 70 + return false, nil 71 + } 72 + 73 + func openIndexer(ctx context.Context, path string) (bleve.Index, error) { 74 + l := tlog.FromContext(ctx) 75 + indexer, err := bleve.Open(path) 76 + if err != nil { 77 + if errors.Is(err, upsidedown.IncompatibleVersion) { 78 + l.Info("Indexer was built with a previous version of bleve, deleting and rebuilding") 79 + return nil, os.RemoveAll(path) 80 + } 81 + return nil, nil 82 + } 83 + return indexer, nil 84 + } 85 + 86 + func PopulateIndexer(ctx context.Context, ix *Indexer, e db.Execer) error { 87 + l := tlog.FromContext(ctx) 88 + count := 0 89 + err := pagination.IterateAll( 90 + func(page pagination.Page) ([]models.Issue, error) { 91 + return db.GetIssuesPaginated(e, page) 92 + }, 93 + func(issues []models.Issue) error { 94 + count += len(issues) 95 + return ix.Index(ctx, issues...) 96 + }, 97 + ) 98 + l.Info("issues indexed", "count", count) 99 + return err 100 + } 101 + 102 + // issueData data stored and will be indexed 103 + type issueData struct { 104 + ID int64 `json:"id"` 105 + RepoAt string `json:"repo_at"` 106 + IssueID int `json:"issue_id"` 107 + Title string `json:"title"` 108 + Body string `json:"body"` 109 + 110 + IsOpen bool `json:"is_open"` 111 + Comments []IssueCommentData `json:"comments"` 112 + } 113 + 114 + func makeIssueData(issue *models.Issue) issueData { 115 + return issueData{ 116 + ID: issue.Id, 117 + RepoAt: issue.RepoAt.String(), 118 + IssueID: issue.IssueId, 119 + Title: issue.Title, 120 + Body: issue.Body, 121 + IsOpen: issue.Open, 122 + } 123 + } 124 + 125 + type IssueCommentData struct { 126 + Body string `json:"body"` 127 + } 128 + 129 + type SearchResult struct { 130 + Hits []int64 131 + Total uint64 132 + } 133 + 134 + const maxBatchSize = 20 135 + 136 + func (ix *Indexer) Index(ctx context.Context, issues ...models.Issue) error { 137 + batch := bleveutil.NewFlushingBatch(ix.indexer, maxBatchSize) 138 + for _, issue := range issues { 139 + issueData := makeIssueData(&issue) 140 + if err := batch.Index(base36.Encode(issue.Id), issueData); err != nil { 141 + return err 142 + } 143 + } 144 + return batch.Flush() 145 + } 146 + 147 + // Search searches for issues 148 + func (ix *Indexer) Search(ctx context.Context, opts models.IssueSearchOptions) (*SearchResult, error) { 149 + var queries []query.Query 150 + 151 + if opts.Keyword != "" { 152 + queries = append(queries, bleve.NewDisjunctionQuery( 153 + matchAndQuery(opts.Keyword, "title"), 154 + matchAndQuery(opts.Keyword, "body"), 155 + )) 156 + } 157 + queries = append(queries, keywordFieldQuery(opts.RepoAt, "repo_at")) 158 + queries = append(queries, boolFieldQuery(opts.IsOpen, "is_open")) 159 + // TODO: append more queries 160 + 161 + var indexerQuery query.Query = bleve.NewConjunctionQuery(queries...) 162 + searchReq := bleve.NewSearchRequestOptions(indexerQuery, opts.Page.Limit, opts.Page.Offset, false) 163 + res, err := ix.indexer.SearchInContext(ctx, searchReq) 164 + if err != nil { 165 + return nil, nil 166 + } 167 + ret := &SearchResult{ 168 + Total: res.Total, 169 + Hits: make([]int64, len(res.Hits)), 170 + } 171 + for i, hit := range res.Hits { 172 + id, err := base36.Decode(hit.ID) 173 + if err != nil { 174 + return nil, err 175 + } 176 + ret.Hits[i] = id 177 + } 178 + return ret, nil 179 + } 180 + 181 + func matchAndQuery(keyword, field string) query.Query { 182 + q := bleve.NewMatchQuery(keyword) 183 + q.FieldVal = field 184 + return q 185 + } 186 + 187 + func boolFieldQuery(val bool, field string) query.Query { 188 + q := bleve.NewBoolFieldQuery(val) 189 + q.FieldVal = field 190 + return q 191 + } 192 + 193 + func keywordFieldQuery(keyword, field string) query.Query { 194 + q := bleve.NewTermQuery(keyword) 195 + q.FieldVal = field 196 + return q 197 + }
+20
appview/indexer/notifier.go
···
··· 1 + package indexer 2 + 3 + import ( 4 + "context" 5 + 6 + "tangled.org/core/appview/models" 7 + "tangled.org/core/appview/notify" 8 + "tangled.org/core/log" 9 + ) 10 + 11 + var _ notify.Notifier = &Indexer{} 12 + 13 + func (ix *Indexer) NewIssue(ctx context.Context, issue *models.Issue) { 14 + l := log.FromContext(ctx).With("notifier", "indexer.NewIssue", "issue", issue) 15 + l.Debug("indexing new issue") 16 + err := ix.Issues.Index(ctx, *issue) 17 + if err != nil { 18 + l.Error("failed to index an issue", "err", err) 19 + } 20 + }
+5
appview/issues/issues.go
··· 19 "tangled.org/core/api/tangled" 20 "tangled.org/core/appview/config" 21 "tangled.org/core/appview/db" 22 "tangled.org/core/appview/models" 23 "tangled.org/core/appview/notify" 24 "tangled.org/core/appview/oauth" ··· 40 notifier notify.Notifier 41 logger *slog.Logger 42 validator *validator.Validator 43 } 44 45 func New( ··· 51 config *config.Config, 52 notifier notify.Notifier, 53 validator *validator.Validator, 54 logger *slog.Logger, 55 ) *Issues { 56 return &Issues{ ··· 63 notifier: notifier, 64 logger: logger, 65 validator: validator, 66 } 67 } 68 ··· 847 Rkey: tid.TID(), 848 Title: r.FormValue("title"), 849 Body: r.FormValue("body"), 850 Did: user.Did, 851 Created: time.Now(), 852 Repo: &f.Repo,
··· 19 "tangled.org/core/api/tangled" 20 "tangled.org/core/appview/config" 21 "tangled.org/core/appview/db" 22 + issues_indexer "tangled.org/core/appview/indexer/issues" 23 "tangled.org/core/appview/models" 24 "tangled.org/core/appview/notify" 25 "tangled.org/core/appview/oauth" ··· 41 notifier notify.Notifier 42 logger *slog.Logger 43 validator *validator.Validator 44 + indexer *issues_indexer.Indexer 45 } 46 47 func New( ··· 53 config *config.Config, 54 notifier notify.Notifier, 55 validator *validator.Validator, 56 + indexer *issues_indexer.Indexer, 57 logger *slog.Logger, 58 ) *Issues { 59 return &Issues{ ··· 66 notifier: notifier, 67 logger: logger, 68 validator: validator, 69 + indexer: indexer, 70 } 71 } 72 ··· 851 Rkey: tid.TID(), 852 Title: r.FormValue("title"), 853 Body: r.FormValue("body"), 854 + Open: true, 855 Did: user.Did, 856 Created: time.Now(), 857 Repo: &f.Repo,
+23
appview/models/search.go
···
··· 1 + package models 2 + 3 + import "tangled.org/core/appview/pagination" 4 + 5 + type IssueSearchOptions struct { 6 + Keyword string 7 + RepoAt string 8 + IsOpen bool 9 + 10 + Page pagination.Page 11 + } 12 + 13 + // func (so *SearchOptions) ToFilters() []filter { 14 + // var filters []filter 15 + // if so.IsOpen != nil { 16 + // openValue := 0 17 + // if *so.IsOpen { 18 + // openValue = 1 19 + // } 20 + // filters = append(filters, FilterEq("open", openValue)) 21 + // } 22 + // return filters 23 + // }
+1
appview/pages/pages.go
··· 970 LabelDefs map[string]*models.LabelDefinition 971 Page pagination.Page 972 FilteringByOpen bool 973 } 974 975 func (p *Pages) RepoIssues(w io.Writer, params RepoIssuesParams) error {
··· 970 LabelDefs map[string]*models.LabelDefinition 971 Page pagination.Page 972 FilteringByOpen bool 973 + FilterQuery string 974 } 975 976 func (p *Pages) RepoIssues(w io.Writer, params RepoIssuesParams) error {
+23
appview/pagination/page.go
··· 29 Limit: p.Limit, 30 } 31 }
··· 29 Limit: p.Limit, 30 } 31 } 32 + 33 + func IterateAll[T any]( 34 + fetch func(page Page) ([]T, error), 35 + handle func(items []T) error, 36 + ) error { 37 + page := FirstPage() 38 + for { 39 + items, err := fetch(page) 40 + if err != nil { 41 + return err 42 + } 43 + 44 + err = handle(items) 45 + if err != nil { 46 + return err 47 + } 48 + if len(items) < page.Limit { 49 + break 50 + } 51 + page = page.Next() 52 + } 53 + return nil 54 + }
+1
appview/state/router.go
··· 262 s.config, 263 s.notifier, 264 s.validator, 265 log.SubLogger(s.logger, "issues"), 266 ) 267 return issues.Router(mw)
··· 262 s.config, 263 s.notifier, 264 s.validator, 265 + s.indexer.Issues, 266 log.SubLogger(s.logger, "issues"), 267 ) 268 return issues.Router(mw)
+10
appview/state/state.go
··· 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/config" 16 "tangled.org/core/appview/db" 17 "tangled.org/core/appview/models" 18 "tangled.org/core/appview/notify" 19 dbnotify "tangled.org/core/appview/notify/db" ··· 43 type State struct { 44 db *db.DB 45 notifier notify.Notifier 46 oauth *oauth.OAuth 47 enforcer *rbac.Enforcer 48 pages *pages.Pages ··· 65 return nil, fmt.Errorf("failed to create db: %w", err) 66 } 67 68 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 69 if err != nil { 70 return nil, fmt.Errorf("failed to create enforcer: %w", err) ··· 159 if !config.Core.Dev { 160 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 161 } 162 notifier := notify.NewMergedNotifier(notifiers...) 163 164 state := &State{ 165 d, 166 notifier, 167 oauth, 168 enforcer, 169 pages,
··· 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/config" 16 "tangled.org/core/appview/db" 17 + "tangled.org/core/appview/indexer" 18 "tangled.org/core/appview/models" 19 "tangled.org/core/appview/notify" 20 dbnotify "tangled.org/core/appview/notify/db" ··· 44 type State struct { 45 db *db.DB 46 notifier notify.Notifier 47 + indexer *indexer.Indexer 48 oauth *oauth.OAuth 49 enforcer *rbac.Enforcer 50 pages *pages.Pages ··· 67 return nil, fmt.Errorf("failed to create db: %w", err) 68 } 69 70 + indexer := indexer.New(log.SubLogger(logger, "indexer")) 71 + err = indexer.Init(ctx, d) 72 + if err != nil { 73 + return nil, fmt.Errorf("failed to create indexer: %w", err) 74 + } 75 + 76 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 77 if err != nil { 78 return nil, fmt.Errorf("failed to create enforcer: %w", err) ··· 167 if !config.Core.Dev { 168 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 169 } 170 + notifiers = append(notifiers, indexer) 171 notifier := notify.NewMergedNotifier(notifiers...) 172 173 state := &State{ 174 d, 175 notifier, 176 + indexer, 177 oauth, 178 enforcer, 179 pages,

History

15 rounds 2 comments
sign up or login to add to the discussion
1 commit
expand
appview: add basic issue indexer
expand 0 comments
pull request successfully merged
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 1 comment

this changeset largely lgtm!

1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 1 comment
  • this is an N+1 query, because we first get all issues and then make one query for each issue. understandably this only needs to happen when populating the indices but we could improve this by just making one query
  • we can use a pagination free API for getting issues with comments while populating indexer
  • more of a note to self: search index should also handle issue deletions
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer (wip)
expand 0 comments