Monorepo for Tangled tangled.org

appview: add basic issue indexer (wip) #494

merged opened by boltless.me targeting master from boltless.me/core: feat/search
  • Heavily inspired by gitea
  • add GetAllIssues which only receives a paginator and gathers all issues ignoring repoAt field

Signed-off-by: Seongmin Lee boltlessengineer@proton.me

Labels

None yet.

assignee

None yet.

Participants 2
AT URI
at://did:plc:xasnlahkri4ewmbuzly2rlc5/sh.tangled.repo.pull/3lwecnvoz4z22
+398
Diff #14
+1
.gitignore
··· 15 15 .env 16 16 *.rdb 17 17 .envrc 18 + *.bleve 18 19 # Created if following hacking.md 19 20 genjwks.out 20 21 /nix/vm-data
+20
appview/indexer/base36/base36.go
··· 1 + // mostly copied from gitea/modules/indexer/internal/base32 2 + 3 + package base36 4 + 5 + import ( 6 + "fmt" 7 + "strconv" 8 + ) 9 + 10 + func Encode(i int64) string { 11 + return strconv.FormatInt(i, 36) 12 + } 13 + 14 + func Decode(s string) (int64, error) { 15 + i, err := strconv.ParseInt(s, 36, 64) 16 + if err != nil { 17 + return 0, fmt.Errorf("invalid base36 integer %q: %w", s, err) 18 + } 19 + return i, nil 20 + }
+58
appview/indexer/bleve/batch.go
··· 1 + // Copyright 2021 The Gitea Authors. All rights reserved. 2 + // SPDX-License-Identifier: MIT 3 + 4 + package bleveutil 5 + 6 + import ( 7 + "github.com/blevesearch/bleve/v2" 8 + ) 9 + 10 + // FlushingBatch is a batch of operations that automatically flushes to the 11 + // underlying index once it reaches a certain size. 12 + type FlushingBatch struct { 13 + maxBatchSize int 14 + batch *bleve.Batch 15 + index bleve.Index 16 + } 17 + 18 + // NewFlushingBatch creates a new flushing batch for the specified index. Once 19 + // the number of operations in the batch reaches the specified limit, the batch 20 + // automatically flushes its operations to the index. 21 + func NewFlushingBatch(index bleve.Index, maxBatchSize int) *FlushingBatch { 22 + return &FlushingBatch{ 23 + maxBatchSize: maxBatchSize, 24 + batch: index.NewBatch(), 25 + index: index, 26 + } 27 + } 28 + 29 + // Index add a new index to batch 30 + func (b *FlushingBatch) Index(id string, data any) error { 31 + if err := b.batch.Index(id, data); err != nil { 32 + return err 33 + } 34 + return b.flushIfFull() 35 + } 36 + 37 + // Delete add a delete index to batch 38 + func (b *FlushingBatch) Delete(id string) error { 39 + b.batch.Delete(id) 40 + return b.flushIfFull() 41 + } 42 + 43 + func (b *FlushingBatch) flushIfFull() error { 44 + if b.batch.Size() < b.maxBatchSize { 45 + return nil 46 + } 47 + return b.Flush() 48 + } 49 + 50 + // Flush submit the batch and create a new one 51 + func (b *FlushingBatch) Flush() error { 52 + err := b.index.Batch(b.batch) 53 + if err != nil { 54 + return err 55 + } 56 + b.batch = b.index.NewBatch() 57 + return nil 58 + }
+24
appview/indexer/bleve/query.go
··· 1 + package bleveutil 2 + 3 + import ( 4 + "github.com/blevesearch/bleve/v2" 5 + "github.com/blevesearch/bleve/v2/search/query" 6 + ) 7 + 8 + func MatchAndQuery(field, keyword string) query.Query { 9 + q := bleve.NewMatchQuery(keyword) 10 + q.FieldVal = field 11 + return q 12 + } 13 + 14 + func BoolFieldQuery(field string, val bool) query.Query { 15 + q := bleve.NewBoolFieldQuery(val) 16 + q.FieldVal = field 17 + return q 18 + } 19 + 20 + func KeywordFieldQuery(field, keyword string) query.Query { 21 + q := bleve.NewTermQuery(keyword) 22 + q.FieldVal = field 23 + return q 24 + }
+32
appview/indexer/indexer.go
··· 1 + package indexer 2 + 3 + import ( 4 + "context" 5 + "log/slog" 6 + 7 + "tangled.org/core/appview/db" 8 + issues_indexer "tangled.org/core/appview/indexer/issues" 9 + "tangled.org/core/appview/notify" 10 + tlog "tangled.org/core/log" 11 + ) 12 + 13 + type Indexer struct { 14 + Issues *issues_indexer.Indexer 15 + logger *slog.Logger 16 + notify.BaseNotifier 17 + } 18 + 19 + func New(logger *slog.Logger) *Indexer { 20 + return &Indexer{ 21 + issues_indexer.NewIndexer("indexes.bleve"), 22 + logger, 23 + notify.BaseNotifier{}, 24 + } 25 + } 26 + 27 + // Init initializes all indexers 28 + func (ix *Indexer) Init(ctx context.Context, db *db.DB) error { 29 + ctx = tlog.IntoContext(ctx, ix.logger) 30 + ix.Issues.Init(ctx, db) 31 + return nil 32 + }
+180
appview/indexer/issues/indexer.go
··· 1 + // heavily inspired by gitea's model (basically copy-pasted) 2 + package issues_indexer 3 + 4 + import ( 5 + "context" 6 + "errors" 7 + "log" 8 + "os" 9 + 10 + "github.com/blevesearch/bleve/v2" 11 + "github.com/blevesearch/bleve/v2/index/upsidedown" 12 + "github.com/blevesearch/bleve/v2/search/query" 13 + "tangled.org/core/appview/db" 14 + "tangled.org/core/appview/indexer/base36" 15 + "tangled.org/core/appview/indexer/bleve" 16 + "tangled.org/core/appview/models" 17 + "tangled.org/core/appview/pagination" 18 + tlog "tangled.org/core/log" 19 + ) 20 + 21 + type Indexer struct { 22 + indexer bleve.Index 23 + path string 24 + } 25 + 26 + func NewIndexer(indexDir string) *Indexer { 27 + return &Indexer{ 28 + path: indexDir, 29 + } 30 + } 31 + 32 + // Init initializes the indexer 33 + func (ix *Indexer) Init(ctx context.Context, e db.Execer) { 34 + l := tlog.FromContext(ctx) 35 + existed, err := ix.intialize(ctx) 36 + if err != nil { 37 + log.Fatalln("failed to initialize issue indexer", err) 38 + } 39 + if !existed { 40 + l.Debug("Populating the issue indexer") 41 + err := PopulateIndexer(ctx, ix, e) 42 + if err != nil { 43 + log.Fatalln("failed to populate issue indexer", err) 44 + } 45 + } 46 + l.Info("Initialized the issue indexer") 47 + } 48 + 49 + func (ix *Indexer) intialize(ctx context.Context) (bool, error) { 50 + if ix.indexer != nil { 51 + return false, errors.New("indexer is already initialized") 52 + } 53 + 54 + indexer, err := openIndexer(ctx, ix.path) 55 + if err != nil { 56 + return false, err 57 + } 58 + if indexer != nil { 59 + ix.indexer = indexer 60 + return true, nil 61 + } 62 + 63 + mapping := bleve.NewIndexMapping() 64 + indexer, err = bleve.New(ix.path, mapping) 65 + if err != nil { 66 + return false, err 67 + } 68 + 69 + ix.indexer = indexer 70 + 71 + return false, nil 72 + } 73 + 74 + func openIndexer(ctx context.Context, path string) (bleve.Index, error) { 75 + l := tlog.FromContext(ctx) 76 + indexer, err := bleve.Open(path) 77 + if err != nil { 78 + if errors.Is(err, upsidedown.IncompatibleVersion) { 79 + l.Info("Indexer was built with a previous version of bleve, deleting and rebuilding") 80 + return nil, os.RemoveAll(path) 81 + } 82 + return nil, nil 83 + } 84 + return indexer, nil 85 + } 86 + 87 + func PopulateIndexer(ctx context.Context, ix *Indexer, e db.Execer) error { 88 + l := tlog.FromContext(ctx) 89 + count := 0 90 + err := pagination.IterateAll( 91 + func(page pagination.Page) ([]models.Issue, error) { 92 + return db.GetIssuesPaginated(e, page) 93 + }, 94 + func(issues []models.Issue) error { 95 + count += len(issues) 96 + return ix.Index(ctx, issues...) 97 + }, 98 + ) 99 + l.Info("issues indexed", "count", count) 100 + return err 101 + } 102 + 103 + // issueData data stored and will be indexed 104 + type issueData struct { 105 + ID int64 `json:"id"` 106 + RepoAt string `json:"repo_at"` 107 + IssueID int `json:"issue_id"` 108 + Title string `json:"title"` 109 + Body string `json:"body"` 110 + 111 + IsOpen bool `json:"is_open"` 112 + Comments []IssueCommentData `json:"comments"` 113 + } 114 + 115 + func makeIssueData(issue *models.Issue) *issueData { 116 + return &issueData{ 117 + ID: issue.Id, 118 + RepoAt: issue.RepoAt.String(), 119 + IssueID: issue.IssueId, 120 + Title: issue.Title, 121 + Body: issue.Body, 122 + IsOpen: issue.Open, 123 + } 124 + } 125 + 126 + type IssueCommentData struct { 127 + Body string `json:"body"` 128 + } 129 + 130 + type SearchResult struct { 131 + Hits []int64 132 + Total uint64 133 + } 134 + 135 + const maxBatchSize = 20 136 + 137 + func (ix *Indexer) Index(ctx context.Context, issues ...models.Issue) error { 138 + batch := bleveutil.NewFlushingBatch(ix.indexer, maxBatchSize) 139 + for _, issue := range issues { 140 + issueData := makeIssueData(&issue) 141 + if err := batch.Index(base36.Encode(issue.Id), issueData); err != nil { 142 + return err 143 + } 144 + } 145 + return batch.Flush() 146 + } 147 + 148 + // Search searches for issues 149 + func (ix *Indexer) Search(ctx context.Context, opts models.IssueSearchOptions) (*SearchResult, error) { 150 + var queries []query.Query 151 + 152 + if opts.Keyword != "" { 153 + queries = append(queries, bleve.NewDisjunctionQuery( 154 + bleveutil.MatchAndQuery("title", opts.Keyword), 155 + bleveutil.MatchAndQuery("body", opts.Keyword), 156 + )) 157 + } 158 + queries = append(queries, bleveutil.KeywordFieldQuery("repo_at", opts.RepoAt)) 159 + queries = append(queries, bleveutil.BoolFieldQuery("is_open", opts.IsOpen)) 160 + // TODO: append more queries 161 + 162 + var indexerQuery query.Query = bleve.NewConjunctionQuery(queries...) 163 + searchReq := bleve.NewSearchRequestOptions(indexerQuery, opts.Page.Limit, opts.Page.Offset, false) 164 + res, err := ix.indexer.SearchInContext(ctx, searchReq) 165 + if err != nil { 166 + return nil, nil 167 + } 168 + ret := &SearchResult{ 169 + Total: res.Total, 170 + Hits: make([]int64, len(res.Hits)), 171 + } 172 + for i, hit := range res.Hits { 173 + id, err := base36.Decode(hit.ID) 174 + if err != nil { 175 + return nil, err 176 + } 177 + ret.Hits[i] = id 178 + } 179 + return ret, nil 180 + }
+20
appview/indexer/notifier.go
··· 1 + package indexer 2 + 3 + import ( 4 + "context" 5 + 6 + "tangled.org/core/appview/models" 7 + "tangled.org/core/appview/notify" 8 + "tangled.org/core/log" 9 + ) 10 + 11 + var _ notify.Notifier = &Indexer{} 12 + 13 + func (ix *Indexer) NewIssue(ctx context.Context, issue *models.Issue) { 14 + l := log.FromContext(ctx).With("notifier", "indexer.NewIssue", "issue", issue) 15 + l.Debug("indexing new issue") 16 + err := ix.Issues.Index(ctx, *issue) 17 + if err != nil { 18 + l.Error("failed to index an issue", "err", err) 19 + } 20 + }
+5
appview/issues/issues.go
··· 19 19 "tangled.org/core/api/tangled" 20 20 "tangled.org/core/appview/config" 21 21 "tangled.org/core/appview/db" 22 + issues_indexer "tangled.org/core/appview/indexer/issues" 22 23 "tangled.org/core/appview/models" 23 24 "tangled.org/core/appview/notify" 24 25 "tangled.org/core/appview/oauth" ··· 40 41 notifier notify.Notifier 41 42 logger *slog.Logger 42 43 validator *validator.Validator 44 + indexer *issues_indexer.Indexer 43 45 } 44 46 45 47 func New( ··· 51 53 config *config.Config, 52 54 notifier notify.Notifier, 53 55 validator *validator.Validator, 56 + indexer *issues_indexer.Indexer, 54 57 logger *slog.Logger, 55 58 ) *Issues { 56 59 return &Issues{ ··· 63 66 notifier: notifier, 64 67 logger: logger, 65 68 validator: validator, 69 + indexer: indexer, 66 70 } 67 71 } 68 72 ··· 843 847 Rkey: tid.TID(), 844 848 Title: r.FormValue("title"), 845 849 Body: r.FormValue("body"), 850 + Open: true, 846 851 Did: user.Did, 847 852 Created: time.Now(), 848 853 Repo: &f.Repo,
+23
appview/models/search.go
··· 1 + package models 2 + 3 + import "tangled.org/core/appview/pagination" 4 + 5 + type IssueSearchOptions struct { 6 + Keyword string 7 + RepoAt string 8 + IsOpen bool 9 + 10 + Page pagination.Page 11 + } 12 + 13 + // func (so *SearchOptions) ToFilters() []filter { 14 + // var filters []filter 15 + // if so.IsOpen != nil { 16 + // openValue := 0 17 + // if *so.IsOpen { 18 + // openValue = 1 19 + // } 20 + // filters = append(filters, FilterEq("open", openValue)) 21 + // } 22 + // return filters 23 + // }
+1
appview/pages/pages.go
··· 971 971 LabelDefs map[string]*models.LabelDefinition 972 972 Page pagination.Page 973 973 FilteringByOpen bool 974 + FilterQuery string 974 975 } 975 976 976 977 func (p *Pages) RepoIssues(w io.Writer, params RepoIssuesParams) error {
+23
appview/pagination/page.go
··· 52 52 Limit: p.Limit, 53 53 } 54 54 } 55 + 56 + func IterateAll[T any]( 57 + fetch func(page Page) ([]T, error), 58 + handle func(items []T) error, 59 + ) error { 60 + page := FirstPage() 61 + for { 62 + items, err := fetch(page) 63 + if err != nil { 64 + return err 65 + } 66 + 67 + err = handle(items) 68 + if err != nil { 69 + return err 70 + } 71 + if len(items) < page.Limit { 72 + break 73 + } 74 + page = page.Next() 75 + } 76 + return nil 77 + }
+1
appview/state/router.go
··· 262 262 s.config, 263 263 s.notifier, 264 264 s.validator, 265 + s.indexer.Issues, 265 266 log.SubLogger(s.logger, "issues"), 266 267 ) 267 268 return issues.Router(mw)
+10
appview/state/state.go
··· 14 14 "tangled.org/core/appview" 15 15 "tangled.org/core/appview/config" 16 16 "tangled.org/core/appview/db" 17 + "tangled.org/core/appview/indexer" 17 18 "tangled.org/core/appview/models" 18 19 "tangled.org/core/appview/notify" 19 20 dbnotify "tangled.org/core/appview/notify/db" ··· 43 44 type State struct { 44 45 db *db.DB 45 46 notifier notify.Notifier 47 + indexer *indexer.Indexer 46 48 oauth *oauth.OAuth 47 49 enforcer *rbac.Enforcer 48 50 pages *pages.Pages ··· 65 67 return nil, fmt.Errorf("failed to create db: %w", err) 66 68 } 67 69 70 + indexer := indexer.New(log.SubLogger(logger, "indexer")) 71 + err = indexer.Init(ctx, d) 72 + if err != nil { 73 + return nil, fmt.Errorf("failed to create indexer: %w", err) 74 + } 75 + 68 76 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 69 77 if err != nil { 70 78 return nil, fmt.Errorf("failed to create enforcer: %w", err) ··· 159 167 if !config.Core.Dev { 160 168 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 161 169 } 170 + notifiers = append(notifiers, indexer) 162 171 notifier := notify.NewMergedNotifier(notifiers...) 163 172 164 173 state := &State{ 165 174 d, 166 175 notifier, 176 + indexer, 167 177 oauth, 168 178 enforcer, 169 179 pages,

History

15 rounds 2 comments
sign up or login to add to the discussion
1 commit
expand
appview: add basic issue indexer
expand 0 comments
pull request successfully merged
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 1 comment

this changeset largely lgtm!

1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 1 comment
  • this is an N+1 query, because we first get all issues and then make one query for each issue. understandably this only needs to happen when populating the indices but we could improve this by just making one query
  • we can use a pagination free API for getting issues with comments while populating indexer
  • more of a note to self: search index should also handle issue deletions
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer
expand 0 comments
1 commit
expand
appview: add basic issue indexer (wip)
expand 0 comments