Monorepo for Tangled
at f7ac5765ea4318ea9bb2cddc41c6a4add06e8aae 316 lines 8.7 kB view raw
1// heavily inspired by gitea's model (basically copy-pasted) 2package pulls_indexer 3 4import ( 5 "context" 6 "errors" 7 "log" 8 "os" 9 10 "github.com/blevesearch/bleve/v2" 11 "github.com/blevesearch/bleve/v2/analysis/analyzer/custom" 12 "github.com/blevesearch/bleve/v2/analysis/token/camelcase" 13 "github.com/blevesearch/bleve/v2/analysis/token/lowercase" 14 "github.com/blevesearch/bleve/v2/analysis/token/unicodenorm" 15 "github.com/blevesearch/bleve/v2/analysis/tokenizer/unicode" 16 "github.com/blevesearch/bleve/v2/index/upsidedown" 17 "github.com/blevesearch/bleve/v2/mapping" 18 "github.com/blevesearch/bleve/v2/search/query" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/indexer/base36" 21 "tangled.org/core/appview/indexer/bleve" 22 "tangled.org/core/appview/models" 23 tlog "tangled.org/core/log" 24) 25 26const ( 27 pullIndexerAnalyzer = "pullIndexer" 28 pullIndexerDocType = "pullIndexerDocType" 29 30 unicodeNormalizeName = "uicodeNormalize" 31 32 // Bump this when the index mapping changes to trigger a rebuild. 33 pullIndexerVersion = 2 34) 35 36type Indexer struct { 37 indexer bleve.Index 38 path string 39} 40 41func NewIndexer(indexDir string) *Indexer { 42 return &Indexer{ 43 path: indexDir, 44 } 45} 46 47// Init initializes the indexer 48func (ix *Indexer) Init(ctx context.Context, e db.Execer) { 49 l := tlog.FromContext(ctx) 50 existed, err := ix.intialize(ctx) 51 if err != nil { 52 log.Fatalln("failed to initialize pull indexer", err) 53 } 54 if !existed { 55 l.Debug("Populating the pull indexer") 56 err := PopulateIndexer(ctx, ix, e) 57 if err != nil { 58 log.Fatalln("failed to populate pull indexer", err) 59 } 60 } 61 62 count, _ := ix.indexer.DocCount() 63 l.Info("Initialized the pull indexer", "docCount", count) 64} 65 66func generatePullIndexMapping() (mapping.IndexMapping, error) { 67 mapping := bleve.NewIndexMapping() 68 docMapping := bleve.NewDocumentMapping() 69 70 textFieldMapping := bleve.NewTextFieldMapping() 71 textFieldMapping.Store = false 72 textFieldMapping.IncludeInAll = false 73 74 keywordFieldMapping := bleve.NewKeywordFieldMapping() 75 keywordFieldMapping.Store = false 76 keywordFieldMapping.IncludeInAll = false 77 78 // numericFieldMapping := bleve.NewNumericFieldMapping() 79 80 docMapping.AddFieldMappingsAt("title", textFieldMapping) 81 docMapping.AddFieldMappingsAt("body", textFieldMapping) 82 83 docMapping.AddFieldMappingsAt("repo_at", keywordFieldMapping) 84 docMapping.AddFieldMappingsAt("state", keywordFieldMapping) 85 docMapping.AddFieldMappingsAt("author_did", keywordFieldMapping) 86 docMapping.AddFieldMappingsAt("labels", keywordFieldMapping) 87 88 err := mapping.AddCustomTokenFilter(unicodeNormalizeName, map[string]any{ 89 "type": unicodenorm.Name, 90 "form": unicodenorm.NFC, 91 }) 92 if err != nil { 93 return nil, err 94 } 95 96 err = mapping.AddCustomAnalyzer(pullIndexerAnalyzer, map[string]any{ 97 "type": custom.Name, 98 "char_filters": []string{}, 99 "tokenizer": unicode.Name, 100 "token_filters": []string{unicodeNormalizeName, camelcase.Name, lowercase.Name}, 101 }) 102 if err != nil { 103 return nil, err 104 } 105 106 mapping.DefaultAnalyzer = pullIndexerAnalyzer 107 mapping.AddDocumentMapping(pullIndexerDocType, docMapping) 108 mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) 109 mapping.DefaultMapping = bleve.NewDocumentDisabledMapping() 110 111 return mapping, nil 112} 113 114func (ix *Indexer) intialize(ctx context.Context) (bool, error) { 115 if ix.indexer != nil { 116 return false, errors.New("indexer is already initialized") 117 } 118 119 indexer, err := openIndexer(ctx, ix.path, pullIndexerVersion) 120 if err != nil { 121 return false, err 122 } 123 if indexer != nil { 124 ix.indexer = indexer 125 return true, nil 126 } 127 128 mapping, err := generatePullIndexMapping() 129 if err != nil { 130 return false, err 131 } 132 indexer, err = bleve.New(ix.path, mapping) 133 if err != nil { 134 return false, err 135 } 136 indexer.SetInternal([]byte("mapping_version"), []byte{byte(pullIndexerVersion)}) 137 138 ix.indexer = indexer 139 140 return false, nil 141} 142 143func openIndexer(ctx context.Context, path string, version int) (bleve.Index, error) { 144 l := tlog.FromContext(ctx) 145 indexer, err := bleve.Open(path) 146 if err != nil { 147 if errors.Is(err, upsidedown.IncompatibleVersion) { 148 l.Info("Indexer was built with a previous version of bleve, deleting and rebuilding") 149 return nil, os.RemoveAll(path) 150 } 151 return nil, nil 152 } 153 154 storedVersion, _ := indexer.GetInternal([]byte("mapping_version")) 155 if storedVersion == nil || int(storedVersion[0]) != version { 156 l.Info("Indexer mapping version changed, deleting and rebuilding") 157 indexer.Close() 158 return nil, os.RemoveAll(path) 159 } 160 161 return indexer, nil 162} 163 164func PopulateIndexer(ctx context.Context, ix *Indexer, e db.Execer) error { 165 l := tlog.FromContext(ctx) 166 167 pulls, err := db.GetPulls(e) 168 if err != nil { 169 return err 170 } 171 count := len(pulls) 172 err = ix.Index(ctx, pulls...) 173 if err != nil { 174 return err 175 } 176 l.Info("pulls indexed", "count", count) 177 return err 178} 179 180type pullData struct { 181 ID int64 `json:"id"` 182 RepoAt string `json:"repo_at"` 183 PullID int `json:"pull_id"` 184 Title string `json:"title"` 185 Body string `json:"body"` 186 State string `json:"state"` 187 AuthorDid string `json:"author_did"` 188 Labels []string `json:"labels"` 189 190 Comments []pullCommentData `json:"comments"` 191} 192 193func makePullData(pull *models.Pull) *pullData { 194 return &pullData{ 195 ID: int64(pull.ID), 196 RepoAt: pull.RepoAt.String(), 197 PullID: pull.PullId, 198 Title: pull.Title, 199 Body: pull.Body, 200 State: pull.State.String(), 201 AuthorDid: pull.OwnerDid, 202 Labels: pull.Labels.LabelNames(), 203 } 204} 205 206// Type returns the document type, for bleve's mapping.Classifier interface. 207func (i *pullData) Type() string { 208 return pullIndexerDocType 209} 210 211type pullCommentData struct { 212 Body string `json:"body"` 213} 214 215type searchResult struct { 216 Hits []int64 217 Total uint64 218} 219 220const maxBatchSize = 20 221 222func (ix *Indexer) Index(ctx context.Context, pulls ...*models.Pull) error { 223 batch := bleveutil.NewFlushingBatch(ix.indexer, maxBatchSize) 224 for _, pull := range pulls { 225 pullData := makePullData(pull) 226 if err := batch.Index(base36.Encode(pullData.ID), pullData); err != nil { 227 return err 228 } 229 } 230 return batch.Flush() 231} 232 233func (ix *Indexer) Delete(ctx context.Context, pullID int64) error { 234 return ix.indexer.Delete(base36.Encode(pullID)) 235} 236 237func (ix *Indexer) Search(ctx context.Context, opts models.PullSearchOptions) (*searchResult, error) { 238 var musts []query.Query 239 var mustNots []query.Query 240 241 // TODO(boltless): remove this after implementing pulls page pagination 242 limit := opts.Page.Limit 243 if limit == 0 { 244 limit = 500 245 } 246 247 for _, keyword := range opts.Keywords { 248 musts = append(musts, bleve.NewDisjunctionQuery( 249 bleveutil.MatchAndQuery("title", keyword, pullIndexerAnalyzer, 0), 250 bleveutil.MatchAndQuery("body", keyword, pullIndexerAnalyzer, 0), 251 )) 252 } 253 254 for _, phrase := range opts.Phrases { 255 musts = append(musts, bleve.NewDisjunctionQuery( 256 bleveutil.MatchPhraseQuery("title", phrase, pullIndexerAnalyzer), 257 bleveutil.MatchPhraseQuery("body", phrase, pullIndexerAnalyzer), 258 )) 259 } 260 261 for _, keyword := range opts.NegatedKeywords { 262 mustNots = append(mustNots, bleve.NewDisjunctionQuery( 263 bleveutil.MatchAndQuery("title", keyword, pullIndexerAnalyzer, 0), 264 bleveutil.MatchAndQuery("body", keyword, pullIndexerAnalyzer, 0), 265 )) 266 } 267 268 for _, phrase := range opts.NegatedPhrases { 269 mustNots = append(mustNots, bleve.NewDisjunctionQuery( 270 bleveutil.MatchPhraseQuery("title", phrase, pullIndexerAnalyzer), 271 bleveutil.MatchPhraseQuery("body", phrase, pullIndexerAnalyzer), 272 )) 273 } 274 275 musts = append(musts, bleveutil.KeywordFieldQuery("repo_at", opts.RepoAt)) 276 if opts.State != nil { 277 musts = append(musts, bleveutil.KeywordFieldQuery("state", opts.State.String())) 278 } 279 280 if opts.AuthorDid != "" { 281 musts = append(musts, bleveutil.KeywordFieldQuery("author_did", opts.AuthorDid)) 282 } 283 284 for _, label := range opts.Labels { 285 musts = append(musts, bleveutil.KeywordFieldQuery("labels", label)) 286 } 287 288 if opts.NegatedAuthorDid != "" { 289 mustNots = append(mustNots, bleveutil.KeywordFieldQuery("author_did", opts.NegatedAuthorDid)) 290 } 291 292 for _, label := range opts.NegatedLabels { 293 mustNots = append(mustNots, bleveutil.KeywordFieldQuery("labels", label)) 294 } 295 296 indexerQuery := bleve.NewBooleanQuery() 297 indexerQuery.AddMust(musts...) 298 indexerQuery.AddMustNot(mustNots...) 299 searchReq := bleve.NewSearchRequestOptions(indexerQuery, limit, opts.Page.Offset, false) 300 res, err := ix.indexer.SearchInContext(ctx, searchReq) 301 if err != nil { 302 return nil, nil 303 } 304 ret := &searchResult{ 305 Total: res.Total, 306 Hits: make([]int64, len(res.Hits)), 307 } 308 for i, hit := range res.Hits { 309 id, err := base36.Decode(hit.ID) 310 if err != nil { 311 return nil, err 312 } 313 ret.Hits[i] = id 314 } 315 return ret, nil 316}