Monorepo for Tangled tangled.org

appview: add basic issue indexer (wip) #494

merged opened by boltless.me targeting master from boltless.me/core: feat/search
  • Heavily inspired by gitea
  • add GetAllIssues which only receives a paginator and gathers all issues ignoring repoAt field

Signed-off-by: Seongmin Lee boltlessengineer@proton.me

Labels

None yet.

assignee

None yet.

Participants 2
AT URI
at://did:plc:xasnlahkri4ewmbuzly2rlc5/sh.tangled.repo.pull/3lwecnvoz4z22
+388
Diff #4
+1
.gitignore
··· 15 15 .env 16 16 *.rdb 17 17 .envrc 18 + *.bleve 18 19 # Created if following hacking.md 19 20 genjwks.out 20 21 /nix/vm-data
+20
appview/indexer/base36/base36.go
··· 1 + // mostly copied from gitea/modules/indexer/internal/base32 2 + 3 + package base36 4 + 5 + import ( 6 + "fmt" 7 + "strconv" 8 + ) 9 + 10 + func Encode(i int64) string { 11 + return strconv.FormatInt(i, 36) 12 + } 13 + 14 + func Decode(s string) (int64, error) { 15 + i, err := strconv.ParseInt(s, 36, 64) 16 + if err != nil { 17 + return 0, fmt.Errorf("invalid base36 integer %q: %w", s, err) 18 + } 19 + return i, nil 20 + }
+58
appview/indexer/bleve/batch.go
··· 1 + // Copyright 2021 The Gitea Authors. All rights reserved. 2 + // SPDX-License-Identifier: MIT 3 + 4 + package bleveutil 5 + 6 + import ( 7 + "github.com/blevesearch/bleve/v2" 8 + ) 9 + 10 + // FlushingBatch is a batch of operations that automatically flushes to the 11 + // underlying index once it reaches a certain size. 12 + type FlushingBatch struct { 13 + maxBatchSize int 14 + batch *bleve.Batch 15 + index bleve.Index 16 + } 17 + 18 + // NewFlushingBatch creates a new flushing batch for the specified index. Once 19 + // the number of operations in the batch reaches the specified limit, the batch 20 + // automatically flushes its operations to the index. 21 + func NewFlushingBatch(index bleve.Index, maxBatchSize int) *FlushingBatch { 22 + return &FlushingBatch{ 23 + maxBatchSize: maxBatchSize, 24 + batch: index.NewBatch(), 25 + index: index, 26 + } 27 + } 28 + 29 + // Index add a new index to batch 30 + func (b *FlushingBatch) Index(id string, data any) error { 31 + if err := b.batch.Index(id, data); err != nil { 32 + return err 33 + } 34 + return b.flushIfFull() 35 + } 36 + 37 + // Delete add a delete index to batch 38 + func (b *FlushingBatch) Delete(id string) error { 39 + b.batch.Delete(id) 40 + return b.flushIfFull() 41 + } 42 + 43 + func (b *FlushingBatch) flushIfFull() error { 44 + if b.batch.Size() < b.maxBatchSize { 45 + return nil 46 + } 47 + return b.Flush() 48 + } 49 + 50 + // Flush submit the batch and create a new one 51 + func (b *FlushingBatch) Flush() error { 52 + err := b.index.Batch(b.batch) 53 + if err != nil { 54 + return err 55 + } 56 + b.batch = b.index.NewBatch() 57 + return nil 58 + }
+32
appview/indexer/indexer.go
··· 1 + package indexer 2 + 3 + import ( 4 + "context" 5 + "log/slog" 6 + 7 + "tangled.org/core/appview/db" 8 + issues_indexer "tangled.org/core/appview/indexer/issues" 9 + "tangled.org/core/appview/notify" 10 + tlog "tangled.org/core/log" 11 + ) 12 + 13 + type Indexer struct { 14 + Issues *issues_indexer.Indexer 15 + logger *slog.Logger 16 + notify.BaseNotifier 17 + } 18 + 19 + func New(logger *slog.Logger) *Indexer { 20 + return &Indexer { 21 + issues_indexer.NewIndexer("indexes.bleve"), 22 + logger, 23 + notify.BaseNotifier{}, 24 + } 25 + } 26 + 27 + // Init initializes all indexers 28 + func (ix *Indexer) Init(ctx context.Context, db *db.DB) error { 29 + ctx = tlog.IntoContext(ctx, ix.logger) 30 + ix.Issues.Init(ctx, db) 31 + return nil 32 + }
+189
appview/indexer/issues/indexer.go
··· 1 + package issues_indexer 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "os" 7 + 8 + "github.com/blevesearch/bleve/v2" 9 + "github.com/blevesearch/bleve/v2/index/upsidedown" 10 + "github.com/blevesearch/bleve/v2/search/query" 11 + "tangled.org/core/appview/db" 12 + "tangled.org/core/appview/indexer/base36" 13 + "tangled.org/core/appview/indexer/bleve" 14 + "tangled.org/core/appview/models" 15 + "tangled.org/core/appview/pagination" 16 + tlog "tangled.org/core/log" 17 + ) 18 + 19 + type Indexer struct { 20 + indexer bleve.Index 21 + path string 22 + } 23 + 24 + func NewIndexer(indexDir string) *Indexer { 25 + return &Indexer{ 26 + path: indexDir, 27 + } 28 + } 29 + 30 + // Init initializes the indexer 31 + func (ix *Indexer) Init(ctx context.Context, e db.Execer) { 32 + l := tlog.FromContext(ctx) 33 + existed, err := ix.intialize(ctx) 34 + if err != nil { 35 + l.Error("failed to initialize issue indexer", "err", err) 36 + } 37 + if !existed { 38 + l.Debug("Populating the issue indexer") 39 + err := PopulateIndexer(ctx, ix, e) 40 + if err != nil { 41 + l.Error("failed to populate issue indexer", "err", err) 42 + } 43 + } 44 + l.Info("Initialized the issue indexer") 45 + } 46 + 47 + func (ix *Indexer) intialize(ctx context.Context) (bool, error) { 48 + if ix.indexer != nil { 49 + return false, errors.New("indexer is already initialized") 50 + } 51 + 52 + indexer, err := openIndexer(ctx, ix.path) 53 + if err != nil { 54 + return false, err 55 + } 56 + if indexer != nil { 57 + ix.indexer = indexer 58 + return true, nil 59 + } 60 + 61 + mapping := bleve.NewIndexMapping() 62 + indexer, err = bleve.New(ix.path, mapping) 63 + if err != nil { 64 + return false, err 65 + } 66 + 67 + ix.indexer = indexer 68 + 69 + return false, nil 70 + } 71 + 72 + func openIndexer(ctx context.Context, path string) (bleve.Index, error) { 73 + l := tlog.FromContext(ctx) 74 + indexer, err := bleve.Open(path) 75 + if err != nil { 76 + if errors.Is(err, upsidedown.IncompatibleVersion) { 77 + l.Info("Indexer was built with a previous version of bleve, deleting and rebuilding") 78 + return nil, os.RemoveAll(path) 79 + } 80 + return nil, nil 81 + } 82 + return indexer, nil 83 + } 84 + 85 + func PopulateIndexer(ctx context.Context, ix *Indexer, e db.Execer) error { 86 + l := tlog.FromContext(ctx) 87 + count := 0 88 + err := pagination.IterateAll( 89 + func(page pagination.Page) ([]models.Issue, error) { 90 + return db.GetIssuesPaginated(e, page) 91 + }, 92 + func(issues []models.Issue) error { 93 + var dataList []*IssueData 94 + for _, issue := range issues { 95 + dataList = append(dataList, &IssueData{ 96 + ID: issue.Id, 97 + IssueID: issue.IssueId, 98 + Title: issue.Title, 99 + Body: issue.Body, 100 + IsOpen: issue.Open, 101 + }) 102 + err := ix.Index(ctx, dataList...) 103 + if err != nil { 104 + return err 105 + } 106 + } 107 + return nil 108 + }, 109 + ) 110 + l.Info("issues indexed", "count", count) 111 + return err 112 + } 113 + 114 + // IssueData data stored and will be indexed 115 + type IssueData struct { 116 + ID int64 `json:"id"` 117 + IssueID int `json:"issue_id"` 118 + Title string `json:"title"` 119 + Body string `json:"body"` 120 + 121 + IsOpen bool `json:"is_open"` 122 + Comments []IssueCommentData `json:"comments"` 123 + } 124 + 125 + type IssueCommentData struct { 126 + Body string `json:"body"` 127 + } 128 + 129 + type SearchResult struct { 130 + Hits []int64 131 + Total uint64 132 + } 133 + 134 + const maxBatchSize = 20 135 + 136 + func (ix *Indexer) Index(ctx context.Context, issues ...*IssueData) error { 137 + batch := bleveutil.NewFlushingBatch(ix.indexer, maxBatchSize) 138 + for _, issue := range issues { 139 + if err := batch.Index(base36.Encode(issue.ID), issue); err != nil { 140 + return err 141 + } 142 + } 143 + return batch.Flush() 144 + } 145 + 146 + // Search searches for issues 147 + func (ix *Indexer) Search(ctx context.Context, opts models.IssueSearchOptions) (*SearchResult, error) { 148 + var queries []query.Query 149 + 150 + if opts.Keyword != "" { 151 + queries = append(queries, bleve.NewDisjunctionQuery( 152 + matchAndQuery(opts.Keyword, "title"), 153 + matchAndQuery(opts.Keyword, "body"), 154 + )) 155 + } 156 + queries = append(queries, boolFieldQuery(opts.IsOpen, "is_open")) 157 + // TODO: append more queries 158 + 159 + var indexerQuery query.Query = bleve.NewConjunctionQuery(queries...) 160 + searchReq := bleve.NewSearchRequestOptions(indexerQuery, opts.Page.Limit, opts.Page.Offset, false) 161 + res, err := ix.indexer.SearchInContext(ctx, searchReq) 162 + if err != nil { 163 + return nil, nil 164 + } 165 + ret := &SearchResult{ 166 + Total: res.Total, 167 + Hits: make([]int64, len(res.Hits)), 168 + } 169 + for i, hit := range res.Hits { 170 + id, err := base36.Decode(hit.ID) 171 + if err != nil { 172 + return nil, err 173 + } 174 + ret.Hits[i] = id 175 + } 176 + return ret, nil 177 + } 178 + 179 + func matchAndQuery(keyword, field string) query.Query { 180 + q := bleve.NewMatchQuery(keyword) 181 + q.FieldVal = field 182 + return q 183 + } 184 + 185 + func boolFieldQuery(val bool, field string) query.Query { 186 + q := bleve.NewBoolFieldQuery(val) 187 + q.FieldVal = field 188 + return q 189 + }
+26
appview/indexer/notifier.go
··· 1 + package indexer 2 + 3 + import ( 4 + "context" 5 + 6 + issues_indexer "tangled.org/core/appview/indexer/issues" 7 + "tangled.org/core/appview/models" 8 + "tangled.org/core/appview/notify" 9 + ) 10 + 11 + var _ notify.Notifier = &Indexer{} 12 + 13 + func (ix *Indexer) NewIssue(ctx context.Context, issue *models.Issue) { 14 + ix.Issues.Index(ctx, &issues_indexer.IssueData{ 15 + ID: issue.Id, 16 + IssueID: issue.IssueId, 17 + Title: issue.Title, 18 + Body: issue.Body, 19 + IsOpen: issue.Open, 20 + Comments: []issues_indexer.IssueCommentData{}, 21 + }) 22 + } 23 + 24 + func (ix *Indexer) NewPullComment(ctx context.Context, comment *models.PullComment) { 25 + panic("unimplemented") 26 + }
+4
appview/issues/issues.go
··· 19 19 "tangled.org/core/api/tangled" 20 20 "tangled.org/core/appview/config" 21 21 "tangled.org/core/appview/db" 22 + issues_indexer "tangled.org/core/appview/indexer/issues" 22 23 "tangled.org/core/appview/models" 23 24 "tangled.org/core/appview/notify" 24 25 "tangled.org/core/appview/oauth" ··· 40 41 notifier notify.Notifier 41 42 logger *slog.Logger 42 43 validator *validator.Validator 44 + indexer *issues_indexer.Indexer 43 45 } 44 46 45 47 func New( ··· 51 53 config *config.Config, 52 54 notifier notify.Notifier, 53 55 validator *validator.Validator, 56 + indexer *issues_indexer.Indexer, 54 57 logger *slog.Logger, 55 58 ) *Issues { 56 59 return &Issues{ ··· 63 66 notifier: notifier, 64 67 logger: logger, 65 68 validator: validator, 69 + indexer: indexer, 66 70 } 67 71 } 68 72
+23
appview/models/search.go
··· 1 + package models 2 + 3 + import "tangled.org/core/appview/pagination" 4 + 5 + type IssueSearchOptions struct { 6 + Keyword string 7 + RepoAt string 8 + IsOpen bool 9 + 10 + Page pagination.Page 11 + } 12 + 13 + // func (so *SearchOptions) ToFilters() []filter { 14 + // var filters []filter 15 + // if so.IsOpen != nil { 16 + // openValue := 0 17 + // if *so.IsOpen { 18 + // openValue = 1 19 + // } 20 + // filters = append(filters, FilterEq("open", openValue)) 21 + // } 22 + // return filters 23 + // }
+1
appview/pages/pages.go
··· 970 970 LabelDefs map[string]*models.LabelDefinition 971 971 Page pagination.Page 972 972 FilteringByOpen bool 973 + FilterQuery string 973 974 } 974 975 975 976 func (p *Pages) RepoIssues(w io.Writer, params RepoIssuesParams) error {
+23
appview/pagination/page.go
··· 29 29 Limit: p.Limit, 30 30 } 31 31 } 32 + 33 + func IterateAll[T any]( 34 + fetch func(page Page) ([]T, error), 35 + handle func(items []T) error, 36 + ) error { 37 + page := FirstPage() 38 + for { 39 + items, err := fetch(page) 40 + if err != nil { 41 + return err 42 + } 43 + 44 + err = handle(items) 45 + if err != nil { 46 + return err 47 + } 48 + if len(items) < page.Limit { 49 + break 50 + } 51 + page = page.Next() 52 + } 53 + return nil 54 + }
+1
appview/state/router.go
··· 262 262 s.config, 263 263 s.notifier, 264 264 s.validator, 265 + s.indexer.Issues, 265 266 log.SubLogger(s.logger, "issues"), 266 267 ) 267 268 return issues.Router(mw)
+10
appview/state/state.go
··· 14 14 "tangled.org/core/appview" 15 15 "tangled.org/core/appview/config" 16 16 "tangled.org/core/appview/db" 17 + "tangled.org/core/appview/indexer" 17 18 "tangled.org/core/appview/models" 18 19 "tangled.org/core/appview/notify" 19 20 dbnotify "tangled.org/core/appview/notify/db" ··· 43 44 type State struct { 44 45 db *db.DB 45 46 notifier notify.Notifier 47 + indexer *indexer.Indexer 46 48 oauth *oauth.OAuth 47 49 enforcer *rbac.Enforcer 48 50 pages *pages.Pages ··· 65 67 return nil, fmt.Errorf("failed to create db: %w", err) 66 68 } 67 69 70 + indexer := indexer.New(log.SubLogger(logger, "indexer")) 71 + err = indexer.Init(ctx, d) 72 + if err != nil { 73 + return nil, fmt.Errorf("failed to create indexer: %w", err) 74 + } 75 + 68 76 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 69 77 if err != nil { 70 78 return nil, fmt.Errorf("failed to create enforcer: %w", err) ··· 159 167 if !config.Core.Dev { 160 168 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 161 169 } 170 + notifiers = append(notifiers, indexer) 162 171 notifier := notify.NewMergedNotifier(notifiers...) 163 172 164 173 state := &State{ 165 174 d, 166 175 notifier, 176 + indexer, 167 177 oauth, 168 178 enforcer, 169 179 pages,

History

15 rounds 2 comments
sign up or login to add to the discussion
1 commit
expand
appview: add basic issue indexer
expand 0 comments
pull request successfully merged
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 1 comment

this changeset largely lgtm!

1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 1 comment
  • this is an N+1 query, because we first get all issues and then make one query for each issue. understandably this only needs to happen when populating the indices but we could improve this by just making one query
  • we can use a pagination free API for getting issues with comments while populating indexer
  • more of a note to self: search index should also handle issue deletions
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer (wip)
expand 0 comments