Monorepo for Tangled
at master 327 lines 9.1 kB view raw
1// heavily inspired by gitea's model (basically copy-pasted) 2package pulls_indexer 3 4import ( 5 "context" 6 "errors" 7 "log" 8 "os" 9 10 "github.com/blevesearch/bleve/v2" 11 "github.com/blevesearch/bleve/v2/analysis/analyzer/custom" 12 "github.com/blevesearch/bleve/v2/analysis/token/camelcase" 13 "github.com/blevesearch/bleve/v2/analysis/token/lowercase" 14 "github.com/blevesearch/bleve/v2/analysis/token/unicodenorm" 15 "github.com/blevesearch/bleve/v2/analysis/tokenizer/unicode" 16 "github.com/blevesearch/bleve/v2/index/upsidedown" 17 "github.com/blevesearch/bleve/v2/mapping" 18 "github.com/blevesearch/bleve/v2/search/query" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/indexer/base36" 21 bleveutil "tangled.org/core/appview/indexer/bleve" 22 "tangled.org/core/appview/models" 23 tlog "tangled.org/core/log" 24) 25 26const ( 27 pullIndexerAnalyzer = "pullIndexer" 28 pullIndexerDocType = "pullIndexerDocType" 29 30 unicodeNormalizeName = "uicodeNormalize" 31 32 // Bump this when the index mapping changes to trigger a rebuild. 33 pullIndexerVersion = 3 34) 35 36type Indexer struct { 37 indexer bleve.Index 38 path string 39} 40 41func NewIndexer(indexDir string) *Indexer { 42 return &Indexer{ 43 path: indexDir, 44 } 45} 46 47// Init initializes the indexer 48func (ix *Indexer) Init(ctx context.Context, e db.Execer) { 49 l := tlog.FromContext(ctx) 50 existed, err := ix.intialize(ctx) 51 if err != nil { 52 log.Fatalln("failed to initialize pull indexer", err) 53 } 54 if !existed { 55 l.Debug("Populating the pull indexer") 56 err := PopulateIndexer(ctx, ix, e) 57 if err != nil { 58 log.Fatalln("failed to populate pull indexer", err) 59 } 60 } 61 62 count, _ := ix.indexer.DocCount() 63 l.Info("Initialized the pull indexer", "docCount", count) 64} 65 66func generatePullIndexMapping() (mapping.IndexMapping, error) { 67 mapping := bleve.NewIndexMapping() 68 docMapping := bleve.NewDocumentMapping() 69 70 textFieldMapping := bleve.NewTextFieldMapping() 71 textFieldMapping.Store = false 72 textFieldMapping.IncludeInAll = false 73 74 keywordFieldMapping := bleve.NewKeywordFieldMapping() 75 keywordFieldMapping.Store = false 76 keywordFieldMapping.IncludeInAll = false 77 78 // numericFieldMapping := bleve.NewNumericFieldMapping() 79 80 docMapping.AddFieldMappingsAt("title", textFieldMapping) 81 docMapping.AddFieldMappingsAt("body", textFieldMapping) 82 83 docMapping.AddFieldMappingsAt("repo_at", keywordFieldMapping) 84 docMapping.AddFieldMappingsAt("state", keywordFieldMapping) 85 docMapping.AddFieldMappingsAt("author_did", keywordFieldMapping) 86 docMapping.AddFieldMappingsAt("labels", keywordFieldMapping) 87 docMapping.AddFieldMappingsAt("label_values", keywordFieldMapping) 88 89 err := mapping.AddCustomTokenFilter(unicodeNormalizeName, map[string]any{ 90 "type": unicodenorm.Name, 91 "form": unicodenorm.NFC, 92 }) 93 if err != nil { 94 return nil, err 95 } 96 97 err = mapping.AddCustomAnalyzer(pullIndexerAnalyzer, map[string]any{ 98 "type": custom.Name, 99 "char_filters": []string{}, 100 "tokenizer": unicode.Name, 101 "token_filters": []string{unicodeNormalizeName, camelcase.Name, lowercase.Name}, 102 }) 103 if err != nil { 104 return nil, err 105 } 106 107 mapping.DefaultAnalyzer = pullIndexerAnalyzer 108 mapping.AddDocumentMapping(pullIndexerDocType, docMapping) 109 mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) 110 mapping.DefaultMapping = bleve.NewDocumentDisabledMapping() 111 112 return mapping, nil 113} 114 115func (ix *Indexer) intialize(ctx context.Context) (bool, error) { 116 if ix.indexer != nil { 117 return false, errors.New("indexer is already initialized") 118 } 119 120 indexer, err := openIndexer(ctx, ix.path, pullIndexerVersion) 121 if err != nil { 122 return false, err 123 } 124 if indexer != nil { 125 ix.indexer = indexer 126 return true, nil 127 } 128 129 mapping, err := generatePullIndexMapping() 130 if err != nil { 131 return false, err 132 } 133 indexer, err = bleve.New(ix.path, mapping) 134 if err != nil { 135 return false, err 136 } 137 indexer.SetInternal([]byte("mapping_version"), []byte{byte(pullIndexerVersion)}) 138 139 ix.indexer = indexer 140 141 return false, nil 142} 143 144func openIndexer(ctx context.Context, path string, version int) (bleve.Index, error) { 145 l := tlog.FromContext(ctx) 146 indexer, err := bleve.Open(path) 147 if err != nil { 148 if errors.Is(err, upsidedown.IncompatibleVersion) { 149 l.Info("Indexer was built with a previous version of bleve, deleting and rebuilding") 150 return nil, os.RemoveAll(path) 151 } 152 return nil, nil 153 } 154 155 storedVersion, _ := indexer.GetInternal([]byte("mapping_version")) 156 if storedVersion == nil || int(storedVersion[0]) != version { 157 l.Info("Indexer mapping version changed, deleting and rebuilding") 158 indexer.Close() 159 return nil, os.RemoveAll(path) 160 } 161 162 return indexer, nil 163} 164 165func PopulateIndexer(ctx context.Context, ix *Indexer, e db.Execer) error { 166 l := tlog.FromContext(ctx) 167 168 pulls, err := db.GetPulls(e) 169 if err != nil { 170 return err 171 } 172 count := len(pulls) 173 err = ix.Index(ctx, pulls...) 174 if err != nil { 175 return err 176 } 177 l.Info("pulls indexed", "count", count) 178 return err 179} 180 181type pullData struct { 182 ID int64 `json:"id"` 183 RepoAt string `json:"repo_at"` 184 PullID int `json:"pull_id"` 185 Title string `json:"title"` 186 Body string `json:"body"` 187 State string `json:"state"` 188 AuthorDid string `json:"author_did"` 189 Labels []string `json:"labels"` 190 LabelValues []string `json:"label_values"` 191 192 Comments []pullCommentData `json:"comments"` 193} 194 195func makePullData(pull *models.Pull) *pullData { 196 return &pullData{ 197 ID: int64(pull.ID), 198 RepoAt: pull.RepoAt.String(), 199 PullID: pull.PullId, 200 Title: pull.Title, 201 Body: pull.Body, 202 State: pull.State.String(), 203 AuthorDid: pull.OwnerDid, 204 Labels: pull.Labels.LabelNames(), 205 LabelValues: pull.Labels.LabelNameValues(), 206 } 207} 208 209// Type returns the document type, for bleve's mapping.Classifier interface. 210func (i *pullData) Type() string { 211 return pullIndexerDocType 212} 213 214type pullCommentData struct { 215 Body string `json:"body"` 216} 217 218type searchResult struct { 219 Hits []int64 220 Total uint64 221} 222 223const maxBatchSize = 20 224 225func (ix *Indexer) Index(ctx context.Context, pulls ...*models.Pull) error { 226 batch := bleveutil.NewFlushingBatch(ix.indexer, maxBatchSize) 227 for _, pull := range pulls { 228 pullData := makePullData(pull) 229 if err := batch.Index(base36.Encode(pullData.ID), pullData); err != nil { 230 return err 231 } 232 } 233 return batch.Flush() 234} 235 236func (ix *Indexer) Delete(ctx context.Context, pullID int64) error { 237 return ix.indexer.Delete(base36.Encode(pullID)) 238} 239 240func (ix *Indexer) Search(ctx context.Context, opts models.PullSearchOptions) (*searchResult, error) { 241 var musts []query.Query 242 var mustNots []query.Query 243 244 // TODO(boltless): remove this after implementing pulls page pagination 245 limit := opts.Page.Limit 246 if limit == 0 { 247 limit = 500 248 } 249 250 for _, keyword := range opts.Keywords { 251 musts = append(musts, bleve.NewDisjunctionQuery( 252 bleveutil.MatchAndQuery("title", keyword, pullIndexerAnalyzer, 0), 253 bleveutil.MatchAndQuery("body", keyword, pullIndexerAnalyzer, 0), 254 )) 255 } 256 257 for _, phrase := range opts.Phrases { 258 musts = append(musts, bleve.NewDisjunctionQuery( 259 bleveutil.MatchPhraseQuery("title", phrase, pullIndexerAnalyzer), 260 bleveutil.MatchPhraseQuery("body", phrase, pullIndexerAnalyzer), 261 )) 262 } 263 264 for _, keyword := range opts.NegatedKeywords { 265 mustNots = append(mustNots, bleve.NewDisjunctionQuery( 266 bleveutil.MatchAndQuery("title", keyword, pullIndexerAnalyzer, 0), 267 bleveutil.MatchAndQuery("body", keyword, pullIndexerAnalyzer, 0), 268 )) 269 } 270 271 for _, phrase := range opts.NegatedPhrases { 272 mustNots = append(mustNots, bleve.NewDisjunctionQuery( 273 bleveutil.MatchPhraseQuery("title", phrase, pullIndexerAnalyzer), 274 bleveutil.MatchPhraseQuery("body", phrase, pullIndexerAnalyzer), 275 )) 276 } 277 278 musts = append(musts, bleveutil.KeywordFieldQuery("repo_at", opts.RepoAt)) 279 if opts.State != nil { 280 musts = append(musts, bleveutil.KeywordFieldQuery("state", opts.State.String())) 281 } 282 283 if opts.AuthorDid != "" { 284 musts = append(musts, bleveutil.KeywordFieldQuery("author_did", opts.AuthorDid)) 285 } 286 287 for _, label := range opts.Labels { 288 musts = append(musts, bleveutil.KeywordFieldQuery("labels", label)) 289 } 290 291 for _, did := range opts.NegatedAuthorDids { 292 mustNots = append(mustNots, bleveutil.KeywordFieldQuery("author_did", did)) 293 } 294 295 for _, label := range opts.NegatedLabels { 296 mustNots = append(mustNots, bleveutil.KeywordFieldQuery("labels", label)) 297 } 298 299 for _, lv := range opts.LabelValues { 300 musts = append(musts, bleveutil.KeywordFieldQuery("label_values", lv)) 301 } 302 303 for _, lv := range opts.NegatedLabelValues { 304 mustNots = append(mustNots, bleveutil.KeywordFieldQuery("label_values", lv)) 305 } 306 307 indexerQuery := bleve.NewBooleanQuery() 308 indexerQuery.AddMust(musts...) 309 indexerQuery.AddMustNot(mustNots...) 310 searchReq := bleve.NewSearchRequestOptions(indexerQuery, limit, opts.Page.Offset, false) 311 res, err := ix.indexer.SearchInContext(ctx, searchReq) 312 if err != nil { 313 return nil, nil 314 } 315 ret := &searchResult{ 316 Total: res.Total, 317 Hits: make([]int64, len(res.Hits)), 318 } 319 for i, hit := range res.Hits { 320 id, err := base36.Decode(hit.ID) 321 if err != nil { 322 return nil, err 323 } 324 ret.Hits[i] = id 325 } 326 return ret, nil 327}