Monorepo for Tangled tangled.org

appview: add basic issue indexer (wip) #494

merged opened by boltless.me targeting master from boltless.me/core: feat/search
  • Heavily inspired by gitea
  • add GetAllIssues which only receives a paginator and gathers all issues ignoring repoAt field

Signed-off-by: Seongmin Lee boltlessengineer@proton.me

Labels

None yet.

assignee

None yet.

Participants 2
AT URI
at://did:plc:xasnlahkri4ewmbuzly2rlc5/sh.tangled.repo.pull/3lwecnvoz4z22
+391
Diff #7
+1
.gitignore
··· 15 15 .env 16 16 *.rdb 17 17 .envrc 18 + *.bleve 18 19 # Created if following hacking.md 19 20 genjwks.out 20 21 /nix/vm-data
+20
appview/indexer/base36/base36.go
··· 1 + // mostly copied from gitea/modules/indexer/internal/base32 2 + 3 + package base36 4 + 5 + import ( 6 + "fmt" 7 + "strconv" 8 + ) 9 + 10 + func Encode(i int64) string { 11 + return strconv.FormatInt(i, 36) 12 + } 13 + 14 + func Decode(s string) (int64, error) { 15 + i, err := strconv.ParseInt(s, 36, 64) 16 + if err != nil { 17 + return 0, fmt.Errorf("invalid base36 integer %q: %w", s, err) 18 + } 19 + return i, nil 20 + }
+58
appview/indexer/bleve/batch.go
··· 1 + // Copyright 2021 The Gitea Authors. All rights reserved. 2 + // SPDX-License-Identifier: MIT 3 + 4 + package bleveutil 5 + 6 + import ( 7 + "github.com/blevesearch/bleve/v2" 8 + ) 9 + 10 + // FlushingBatch is a batch of operations that automatically flushes to the 11 + // underlying index once it reaches a certain size. 12 + type FlushingBatch struct { 13 + maxBatchSize int 14 + batch *bleve.Batch 15 + index bleve.Index 16 + } 17 + 18 + // NewFlushingBatch creates a new flushing batch for the specified index. Once 19 + // the number of operations in the batch reaches the specified limit, the batch 20 + // automatically flushes its operations to the index. 21 + func NewFlushingBatch(index bleve.Index, maxBatchSize int) *FlushingBatch { 22 + return &FlushingBatch{ 23 + maxBatchSize: maxBatchSize, 24 + batch: index.NewBatch(), 25 + index: index, 26 + } 27 + } 28 + 29 + // Index add a new index to batch 30 + func (b *FlushingBatch) Index(id string, data any) error { 31 + if err := b.batch.Index(id, data); err != nil { 32 + return err 33 + } 34 + return b.flushIfFull() 35 + } 36 + 37 + // Delete add a delete index to batch 38 + func (b *FlushingBatch) Delete(id string) error { 39 + b.batch.Delete(id) 40 + return b.flushIfFull() 41 + } 42 + 43 + func (b *FlushingBatch) flushIfFull() error { 44 + if b.batch.Size() < b.maxBatchSize { 45 + return nil 46 + } 47 + return b.Flush() 48 + } 49 + 50 + // Flush submit the batch and create a new one 51 + func (b *FlushingBatch) Flush() error { 52 + err := b.index.Batch(b.batch) 53 + if err != nil { 54 + return err 55 + } 56 + b.batch = b.index.NewBatch() 57 + return nil 58 + }
+32
appview/indexer/indexer.go
··· 1 + package indexer 2 + 3 + import ( 4 + "context" 5 + "log/slog" 6 + 7 + "tangled.org/core/appview/db" 8 + issues_indexer "tangled.org/core/appview/indexer/issues" 9 + "tangled.org/core/appview/notify" 10 + tlog "tangled.org/core/log" 11 + ) 12 + 13 + type Indexer struct { 14 + Issues *issues_indexer.Indexer 15 + logger *slog.Logger 16 + notify.BaseNotifier 17 + } 18 + 19 + func New(logger *slog.Logger) *Indexer { 20 + return &Indexer { 21 + issues_indexer.NewIndexer("indexes.bleve"), 22 + logger, 23 + notify.BaseNotifier{}, 24 + } 25 + } 26 + 27 + // Init initializes all indexers 28 + func (ix *Indexer) Init(ctx context.Context, db *db.DB) error { 29 + ctx = tlog.IntoContext(ctx, ix.logger) 30 + ix.Issues.Init(ctx, db) 31 + return nil 32 + }
+197
appview/indexer/issues/indexer.go
··· 1 + // heavily inspired by gitea's model (basically copy-pasted) 2 + package issues_indexer 3 + 4 + import ( 5 + "context" 6 + "errors" 7 + "os" 8 + 9 + "github.com/blevesearch/bleve/v2" 10 + "github.com/blevesearch/bleve/v2/index/upsidedown" 11 + "github.com/blevesearch/bleve/v2/search/query" 12 + "tangled.org/core/appview/db" 13 + "tangled.org/core/appview/indexer/base36" 14 + "tangled.org/core/appview/indexer/bleve" 15 + "tangled.org/core/appview/models" 16 + "tangled.org/core/appview/pagination" 17 + tlog "tangled.org/core/log" 18 + ) 19 + 20 + type Indexer struct { 21 + indexer bleve.Index 22 + path string 23 + } 24 + 25 + func NewIndexer(indexDir string) *Indexer { 26 + return &Indexer{ 27 + path: indexDir, 28 + } 29 + } 30 + 31 + // Init initializes the indexer 32 + func (ix *Indexer) Init(ctx context.Context, e db.Execer) { 33 + l := tlog.FromContext(ctx) 34 + existed, err := ix.intialize(ctx) 35 + if err != nil { 36 + l.Error("failed to initialize issue indexer", "err", err) 37 + } 38 + if !existed { 39 + l.Debug("Populating the issue indexer") 40 + err := PopulateIndexer(ctx, ix, e) 41 + if err != nil { 42 + l.Error("failed to populate issue indexer", "err", err) 43 + } 44 + } 45 + l.Info("Initialized the issue indexer") 46 + } 47 + 48 + func (ix *Indexer) intialize(ctx context.Context) (bool, error) { 49 + if ix.indexer != nil { 50 + return false, errors.New("indexer is already initialized") 51 + } 52 + 53 + indexer, err := openIndexer(ctx, ix.path) 54 + if err != nil { 55 + return false, err 56 + } 57 + if indexer != nil { 58 + ix.indexer = indexer 59 + return true, nil 60 + } 61 + 62 + mapping := bleve.NewIndexMapping() 63 + indexer, err = bleve.New(ix.path, mapping) 64 + if err != nil { 65 + return false, err 66 + } 67 + 68 + ix.indexer = indexer 69 + 70 + return false, nil 71 + } 72 + 73 + func openIndexer(ctx context.Context, path string) (bleve.Index, error) { 74 + l := tlog.FromContext(ctx) 75 + indexer, err := bleve.Open(path) 76 + if err != nil { 77 + if errors.Is(err, upsidedown.IncompatibleVersion) { 78 + l.Info("Indexer was built with a previous version of bleve, deleting and rebuilding") 79 + return nil, os.RemoveAll(path) 80 + } 81 + return nil, nil 82 + } 83 + return indexer, nil 84 + } 85 + 86 + func PopulateIndexer(ctx context.Context, ix *Indexer, e db.Execer) error { 87 + l := tlog.FromContext(ctx) 88 + count := 0 89 + err := pagination.IterateAll( 90 + func(page pagination.Page) ([]models.Issue, error) { 91 + return db.GetIssuesPaginated(e, page) 92 + }, 93 + func(issues []models.Issue) error { 94 + count += len(issues) 95 + return ix.Index(ctx, issues...) 96 + }, 97 + ) 98 + l.Info("issues indexed", "count", count) 99 + return err 100 + } 101 + 102 + // issueData data stored and will be indexed 103 + type issueData struct { 104 + ID int64 `json:"id"` 105 + RepoAt string `json:"repo_at"` 106 + IssueID int `json:"issue_id"` 107 + Title string `json:"title"` 108 + Body string `json:"body"` 109 + 110 + IsOpen bool `json:"is_open"` 111 + Comments []IssueCommentData `json:"comments"` 112 + } 113 + 114 + func makeIssueData(issue *models.Issue) *issueData { 115 + return &issueData{ 116 + ID: issue.Id, 117 + RepoAt: issue.RepoAt.String(), 118 + IssueID: issue.IssueId, 119 + Title: issue.Title, 120 + Body: issue.Body, 121 + IsOpen: issue.Open, 122 + } 123 + } 124 + 125 + type IssueCommentData struct { 126 + Body string `json:"body"` 127 + } 128 + 129 + type SearchResult struct { 130 + Hits []int64 131 + Total uint64 132 + } 133 + 134 + const maxBatchSize = 20 135 + 136 + func (ix *Indexer) Index(ctx context.Context, issues ...models.Issue) error { 137 + batch := bleveutil.NewFlushingBatch(ix.indexer, maxBatchSize) 138 + for _, issue := range issues { 139 + issueData := makeIssueData(&issue) 140 + if err := batch.Index(base36.Encode(issue.Id), issueData); err != nil { 141 + return err 142 + } 143 + } 144 + return batch.Flush() 145 + } 146 + 147 + // Search searches for issues 148 + func (ix *Indexer) Search(ctx context.Context, opts models.IssueSearchOptions) (*SearchResult, error) { 149 + var queries []query.Query 150 + 151 + if opts.Keyword != "" { 152 + queries = append(queries, bleve.NewDisjunctionQuery( 153 + matchAndQuery(opts.Keyword, "title"), 154 + matchAndQuery(opts.Keyword, "body"), 155 + )) 156 + } 157 + queries = append(queries, keywordFieldQuery(opts.RepoAt, "repo_at")) 158 + queries = append(queries, boolFieldQuery(opts.IsOpen, "is_open")) 159 + // TODO: append more queries 160 + 161 + var indexerQuery query.Query = bleve.NewConjunctionQuery(queries...) 162 + searchReq := bleve.NewSearchRequestOptions(indexerQuery, opts.Page.Limit, opts.Page.Offset, false) 163 + res, err := ix.indexer.SearchInContext(ctx, searchReq) 164 + if err != nil { 165 + return nil, nil 166 + } 167 + ret := &SearchResult{ 168 + Total: res.Total, 169 + Hits: make([]int64, len(res.Hits)), 170 + } 171 + for i, hit := range res.Hits { 172 + id, err := base36.Decode(hit.ID) 173 + if err != nil { 174 + return nil, err 175 + } 176 + ret.Hits[i] = id 177 + } 178 + return ret, nil 179 + } 180 + 181 + func matchAndQuery(keyword, field string) query.Query { 182 + q := bleve.NewMatchQuery(keyword) 183 + q.FieldVal = field 184 + return q 185 + } 186 + 187 + func boolFieldQuery(val bool, field string) query.Query { 188 + q := bleve.NewBoolFieldQuery(val) 189 + q.FieldVal = field 190 + return q 191 + } 192 + 193 + func keywordFieldQuery(keyword, field string) query.Query { 194 + q := bleve.NewTermQuery(keyword) 195 + q.FieldVal = field 196 + return q 197 + }
+20
appview/indexer/notifier.go
··· 1 + package indexer 2 + 3 + import ( 4 + "context" 5 + 6 + "tangled.org/core/appview/models" 7 + "tangled.org/core/appview/notify" 8 + "tangled.org/core/log" 9 + ) 10 + 11 + var _ notify.Notifier = &Indexer{} 12 + 13 + func (ix *Indexer) NewIssue(ctx context.Context, issue *models.Issue) { 14 + l := log.FromContext(ctx).With("notifier", "indexer.NewIssue", "issue", issue) 15 + l.Debug("indexing new issue") 16 + err := ix.Issues.Index(ctx, *issue) 17 + if err != nil { 18 + l.Error("failed to index an issue", "err", err) 19 + } 20 + }
+5
appview/issues/issues.go
··· 19 19 "tangled.org/core/api/tangled" 20 20 "tangled.org/core/appview/config" 21 21 "tangled.org/core/appview/db" 22 + issues_indexer "tangled.org/core/appview/indexer/issues" 22 23 "tangled.org/core/appview/models" 23 24 "tangled.org/core/appview/notify" 24 25 "tangled.org/core/appview/oauth" ··· 40 41 notifier notify.Notifier 41 42 logger *slog.Logger 42 43 validator *validator.Validator 44 + indexer *issues_indexer.Indexer 43 45 } 44 46 45 47 func New( ··· 51 53 config *config.Config, 52 54 notifier notify.Notifier, 53 55 validator *validator.Validator, 56 + indexer *issues_indexer.Indexer, 54 57 logger *slog.Logger, 55 58 ) *Issues { 56 59 return &Issues{ ··· 63 66 notifier: notifier, 64 67 logger: logger, 65 68 validator: validator, 69 + indexer: indexer, 66 70 } 67 71 } 68 72 ··· 847 851 Rkey: tid.TID(), 848 852 Title: r.FormValue("title"), 849 853 Body: r.FormValue("body"), 854 + Open: true, 850 855 Did: user.Did, 851 856 Created: time.Now(), 852 857 Repo: &f.Repo,
+23
appview/models/search.go
··· 1 + package models 2 + 3 + import "tangled.org/core/appview/pagination" 4 + 5 + type IssueSearchOptions struct { 6 + Keyword string 7 + RepoAt string 8 + IsOpen bool 9 + 10 + Page pagination.Page 11 + } 12 + 13 + // func (so *SearchOptions) ToFilters() []filter { 14 + // var filters []filter 15 + // if so.IsOpen != nil { 16 + // openValue := 0 17 + // if *so.IsOpen { 18 + // openValue = 1 19 + // } 20 + // filters = append(filters, FilterEq("open", openValue)) 21 + // } 22 + // return filters 23 + // }
+1
appview/pages/pages.go
··· 970 970 LabelDefs map[string]*models.LabelDefinition 971 971 Page pagination.Page 972 972 FilteringByOpen bool 973 + FilterQuery string 973 974 } 974 975 975 976 func (p *Pages) RepoIssues(w io.Writer, params RepoIssuesParams) error {
+23
appview/pagination/page.go
··· 29 29 Limit: p.Limit, 30 30 } 31 31 } 32 + 33 + func IterateAll[T any]( 34 + fetch func(page Page) ([]T, error), 35 + handle func(items []T) error, 36 + ) error { 37 + page := FirstPage() 38 + for { 39 + items, err := fetch(page) 40 + if err != nil { 41 + return err 42 + } 43 + 44 + err = handle(items) 45 + if err != nil { 46 + return err 47 + } 48 + if len(items) < page.Limit { 49 + break 50 + } 51 + page = page.Next() 52 + } 53 + return nil 54 + }
+1
appview/state/router.go
··· 262 262 s.config, 263 263 s.notifier, 264 264 s.validator, 265 + s.indexer.Issues, 265 266 log.SubLogger(s.logger, "issues"), 266 267 ) 267 268 return issues.Router(mw)
+10
appview/state/state.go
··· 14 14 "tangled.org/core/appview" 15 15 "tangled.org/core/appview/config" 16 16 "tangled.org/core/appview/db" 17 + "tangled.org/core/appview/indexer" 17 18 "tangled.org/core/appview/models" 18 19 "tangled.org/core/appview/notify" 19 20 dbnotify "tangled.org/core/appview/notify/db" ··· 43 44 type State struct { 44 45 db *db.DB 45 46 notifier notify.Notifier 47 + indexer *indexer.Indexer 46 48 oauth *oauth.OAuth 47 49 enforcer *rbac.Enforcer 48 50 pages *pages.Pages ··· 65 67 return nil, fmt.Errorf("failed to create db: %w", err) 66 68 } 67 69 70 + indexer := indexer.New(log.SubLogger(logger, "indexer")) 71 + err = indexer.Init(ctx, d) 72 + if err != nil { 73 + return nil, fmt.Errorf("failed to create indexer: %w", err) 74 + } 75 + 68 76 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 69 77 if err != nil { 70 78 return nil, fmt.Errorf("failed to create enforcer: %w", err) ··· 159 167 if !config.Core.Dev { 160 168 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 161 169 } 170 + notifiers = append(notifiers, indexer) 162 171 notifier := notify.NewMergedNotifier(notifiers...) 163 172 164 173 state := &State{ 165 174 d, 166 175 notifier, 176 + indexer, 167 177 oauth, 168 178 enforcer, 169 179 pages,

History

15 rounds 2 comments
sign up or login to add to the discussion
1 commit
expand
appview: add basic issue indexer
expand 0 comments
pull request successfully merged
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 1 comment

this changeset largely lgtm!

1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 1 comment
  • this is an N+1 query, because we first get all issues and then make one query for each issue. understandably this only needs to happen when populating the indices but we could improve this by just making one query
  • we can use a pagination free API for getting issues with comments while populating indexer
  • more of a note to self: search index should also handle issue deletions
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer (wip)
expand 0 comments