Monorepo for Tangled
1// heavily inspired by gitea's model (basically copy-pasted)
2package pulls_indexer
3
4import (
5 "context"
6 "errors"
7 "log"
8 "os"
9
10 "github.com/blevesearch/bleve/v2"
11 "github.com/blevesearch/bleve/v2/analysis/analyzer/custom"
12 "github.com/blevesearch/bleve/v2/analysis/token/camelcase"
13 "github.com/blevesearch/bleve/v2/analysis/token/lowercase"
14 "github.com/blevesearch/bleve/v2/analysis/token/unicodenorm"
15 "github.com/blevesearch/bleve/v2/analysis/tokenizer/unicode"
16 "github.com/blevesearch/bleve/v2/index/upsidedown"
17 "github.com/blevesearch/bleve/v2/mapping"
18 "github.com/blevesearch/bleve/v2/search/query"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/indexer/base36"
21 "tangled.org/core/appview/indexer/bleve"
22 "tangled.org/core/appview/models"
23 tlog "tangled.org/core/log"
24)
25
26const (
27 pullIndexerAnalyzer = "pullIndexer"
28 pullIndexerDocType = "pullIndexerDocType"
29
30 unicodeNormalizeName = "uicodeNormalize"
31
32 // Bump this when the index mapping changes to trigger a rebuild.
33 pullIndexerVersion = 2
34)
35
36type Indexer struct {
37 indexer bleve.Index
38 path string
39}
40
41func NewIndexer(indexDir string) *Indexer {
42 return &Indexer{
43 path: indexDir,
44 }
45}
46
47// Init initializes the indexer
48func (ix *Indexer) Init(ctx context.Context, e db.Execer) {
49 l := tlog.FromContext(ctx)
50 existed, err := ix.intialize(ctx)
51 if err != nil {
52 log.Fatalln("failed to initialize pull indexer", err)
53 }
54 if !existed {
55 l.Debug("Populating the pull indexer")
56 err := PopulateIndexer(ctx, ix, e)
57 if err != nil {
58 log.Fatalln("failed to populate pull indexer", err)
59 }
60 }
61
62 count, _ := ix.indexer.DocCount()
63 l.Info("Initialized the pull indexer", "docCount", count)
64}
65
66func generatePullIndexMapping() (mapping.IndexMapping, error) {
67 mapping := bleve.NewIndexMapping()
68 docMapping := bleve.NewDocumentMapping()
69
70 textFieldMapping := bleve.NewTextFieldMapping()
71 textFieldMapping.Store = false
72 textFieldMapping.IncludeInAll = false
73
74 keywordFieldMapping := bleve.NewKeywordFieldMapping()
75 keywordFieldMapping.Store = false
76 keywordFieldMapping.IncludeInAll = false
77
78 // numericFieldMapping := bleve.NewNumericFieldMapping()
79
80 docMapping.AddFieldMappingsAt("title", textFieldMapping)
81 docMapping.AddFieldMappingsAt("body", textFieldMapping)
82
83 docMapping.AddFieldMappingsAt("repo_at", keywordFieldMapping)
84 docMapping.AddFieldMappingsAt("state", keywordFieldMapping)
85 docMapping.AddFieldMappingsAt("author_did", keywordFieldMapping)
86 docMapping.AddFieldMappingsAt("labels", keywordFieldMapping)
87
88 err := mapping.AddCustomTokenFilter(unicodeNormalizeName, map[string]any{
89 "type": unicodenorm.Name,
90 "form": unicodenorm.NFC,
91 })
92 if err != nil {
93 return nil, err
94 }
95
96 err = mapping.AddCustomAnalyzer(pullIndexerAnalyzer, map[string]any{
97 "type": custom.Name,
98 "char_filters": []string{},
99 "tokenizer": unicode.Name,
100 "token_filters": []string{unicodeNormalizeName, camelcase.Name, lowercase.Name},
101 })
102 if err != nil {
103 return nil, err
104 }
105
106 mapping.DefaultAnalyzer = pullIndexerAnalyzer
107 mapping.AddDocumentMapping(pullIndexerDocType, docMapping)
108 mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping())
109 mapping.DefaultMapping = bleve.NewDocumentDisabledMapping()
110
111 return mapping, nil
112}
113
114func (ix *Indexer) intialize(ctx context.Context) (bool, error) {
115 if ix.indexer != nil {
116 return false, errors.New("indexer is already initialized")
117 }
118
119 indexer, err := openIndexer(ctx, ix.path, pullIndexerVersion)
120 if err != nil {
121 return false, err
122 }
123 if indexer != nil {
124 ix.indexer = indexer
125 return true, nil
126 }
127
128 mapping, err := generatePullIndexMapping()
129 if err != nil {
130 return false, err
131 }
132 indexer, err = bleve.New(ix.path, mapping)
133 if err != nil {
134 return false, err
135 }
136 indexer.SetInternal([]byte("mapping_version"), []byte{byte(pullIndexerVersion)})
137
138 ix.indexer = indexer
139
140 return false, nil
141}
142
143func openIndexer(ctx context.Context, path string, version int) (bleve.Index, error) {
144 l := tlog.FromContext(ctx)
145 indexer, err := bleve.Open(path)
146 if err != nil {
147 if errors.Is(err, upsidedown.IncompatibleVersion) {
148 l.Info("Indexer was built with a previous version of bleve, deleting and rebuilding")
149 return nil, os.RemoveAll(path)
150 }
151 return nil, nil
152 }
153
154 storedVersion, _ := indexer.GetInternal([]byte("mapping_version"))
155 if storedVersion == nil || int(storedVersion[0]) != version {
156 l.Info("Indexer mapping version changed, deleting and rebuilding")
157 indexer.Close()
158 return nil, os.RemoveAll(path)
159 }
160
161 return indexer, nil
162}
163
164func PopulateIndexer(ctx context.Context, ix *Indexer, e db.Execer) error {
165 l := tlog.FromContext(ctx)
166
167 pulls, err := db.GetPulls(e)
168 if err != nil {
169 return err
170 }
171 count := len(pulls)
172 err = ix.Index(ctx, pulls...)
173 if err != nil {
174 return err
175 }
176 l.Info("pulls indexed", "count", count)
177 return err
178}
179
180type pullData struct {
181 ID int64 `json:"id"`
182 RepoAt string `json:"repo_at"`
183 PullID int `json:"pull_id"`
184 Title string `json:"title"`
185 Body string `json:"body"`
186 State string `json:"state"`
187 AuthorDid string `json:"author_did"`
188 Labels []string `json:"labels"`
189
190 Comments []pullCommentData `json:"comments"`
191}
192
193func makePullData(pull *models.Pull) *pullData {
194 return &pullData{
195 ID: int64(pull.ID),
196 RepoAt: pull.RepoAt.String(),
197 PullID: pull.PullId,
198 Title: pull.Title,
199 Body: pull.Body,
200 State: pull.State.String(),
201 AuthorDid: pull.OwnerDid,
202 Labels: pull.Labels.LabelNames(),
203 }
204}
205
206// Type returns the document type, for bleve's mapping.Classifier interface.
207func (i *pullData) Type() string {
208 return pullIndexerDocType
209}
210
211type pullCommentData struct {
212 Body string `json:"body"`
213}
214
215type searchResult struct {
216 Hits []int64
217 Total uint64
218}
219
220const maxBatchSize = 20
221
222func (ix *Indexer) Index(ctx context.Context, pulls ...*models.Pull) error {
223 batch := bleveutil.NewFlushingBatch(ix.indexer, maxBatchSize)
224 for _, pull := range pulls {
225 pullData := makePullData(pull)
226 if err := batch.Index(base36.Encode(pullData.ID), pullData); err != nil {
227 return err
228 }
229 }
230 return batch.Flush()
231}
232
233func (ix *Indexer) Delete(ctx context.Context, pullID int64) error {
234 return ix.indexer.Delete(base36.Encode(pullID))
235}
236
237func (ix *Indexer) Search(ctx context.Context, opts models.PullSearchOptions) (*searchResult, error) {
238 var musts []query.Query
239 var mustNots []query.Query
240
241 // TODO(boltless): remove this after implementing pulls page pagination
242 limit := opts.Page.Limit
243 if limit == 0 {
244 limit = 500
245 }
246
247 for _, keyword := range opts.Keywords {
248 musts = append(musts, bleve.NewDisjunctionQuery(
249 bleveutil.MatchAndQuery("title", keyword, pullIndexerAnalyzer, 0),
250 bleveutil.MatchAndQuery("body", keyword, pullIndexerAnalyzer, 0),
251 ))
252 }
253
254 for _, phrase := range opts.Phrases {
255 musts = append(musts, bleve.NewDisjunctionQuery(
256 bleveutil.MatchPhraseQuery("title", phrase, pullIndexerAnalyzer),
257 bleveutil.MatchPhraseQuery("body", phrase, pullIndexerAnalyzer),
258 ))
259 }
260
261 for _, keyword := range opts.NegatedKeywords {
262 mustNots = append(mustNots, bleve.NewDisjunctionQuery(
263 bleveutil.MatchAndQuery("title", keyword, pullIndexerAnalyzer, 0),
264 bleveutil.MatchAndQuery("body", keyword, pullIndexerAnalyzer, 0),
265 ))
266 }
267
268 for _, phrase := range opts.NegatedPhrases {
269 mustNots = append(mustNots, bleve.NewDisjunctionQuery(
270 bleveutil.MatchPhraseQuery("title", phrase, pullIndexerAnalyzer),
271 bleveutil.MatchPhraseQuery("body", phrase, pullIndexerAnalyzer),
272 ))
273 }
274
275 musts = append(musts, bleveutil.KeywordFieldQuery("repo_at", opts.RepoAt))
276 if opts.State != nil {
277 musts = append(musts, bleveutil.KeywordFieldQuery("state", opts.State.String()))
278 }
279
280 if opts.AuthorDid != "" {
281 musts = append(musts, bleveutil.KeywordFieldQuery("author_did", opts.AuthorDid))
282 }
283
284 for _, label := range opts.Labels {
285 musts = append(musts, bleveutil.KeywordFieldQuery("labels", label))
286 }
287
288 if opts.NegatedAuthorDid != "" {
289 mustNots = append(mustNots, bleveutil.KeywordFieldQuery("author_did", opts.NegatedAuthorDid))
290 }
291
292 for _, label := range opts.NegatedLabels {
293 mustNots = append(mustNots, bleveutil.KeywordFieldQuery("labels", label))
294 }
295
296 indexerQuery := bleve.NewBooleanQuery()
297 indexerQuery.AddMust(musts...)
298 indexerQuery.AddMustNot(mustNots...)
299 searchReq := bleve.NewSearchRequestOptions(indexerQuery, limit, opts.Page.Offset, false)
300 res, err := ix.indexer.SearchInContext(ctx, searchReq)
301 if err != nil {
302 return nil, nil
303 }
304 ret := &searchResult{
305 Total: res.Total,
306 Hits: make([]int64, len(res.Hits)),
307 }
308 for i, hit := range res.Hits {
309 id, err := base36.Decode(hit.ID)
310 if err != nil {
311 return nil, err
312 }
313 ret.Hits[i] = id
314 }
315 return ret, nil
316}