Merge pull request #2638 from haiyanmeng/mem

Avoid reprocess queries whose range size is 0 and improve memory usage
This commit is contained in:
Jeff Regan
2020-06-25 10:58:20 -07:00
committed by GitHub
6 changed files with 86 additions and 45 deletions

View File

@@ -1,4 +1,4 @@
FROM golang:1.11 AS build
FROM golang:1.14 AS build
ARG GO111MODULE=on

View File

@@ -159,40 +159,31 @@ func main() {
}
}
seedDocs := make(crawler.CrawlSeed, 0)
// get all the documents in the index
getSeedDocsFunc := func() {
query := []byte(`{ "query":{ "match_all":{} } }`)
it := idx.IterateQuery(query, 10000, 60*time.Second)
for it.Next() {
for _, hit := range it.Value().Hits.Hits {
seedDocs = append(seedDocs, hit.Document.Document.Copy())
}
}
if err := it.Err(); err != nil {
log.Fatalf("getSeedDocsFunc Error iterating: %v\n", err)
}
}
query := []byte(`{ "query":{ "match_all":{} } }`)
it := idx.IterateQuery(query, 10000, 60*time.Second)
switch mode {
case CrawlIndexAndGithub:
getSeedDocsFunc()
crawlers := []crawler.Crawler{ghCrawlerConstructor("", "")}
crawler.CrawlFromSeed(ctx, seedDocs, crawlers, docConverter, indexFunc, seen)
crawler.CrawlFromSeedIterator(ctx, it, crawlers, docConverter, indexFunc, seen)
crawler.CrawlGithub(ctx, crawlers, docConverter, indexFunc, seen)
case CrawlIndex:
getSeedDocsFunc()
crawlers := []crawler.Crawler{ghCrawlerConstructor("", "")}
crawler.CrawlFromSeed(ctx, seedDocs, crawlers, docConverter, indexFunc, seen)
crawler.CrawlFromSeedIterator(ctx, it, crawlers, docConverter, indexFunc, seen)
case CrawlGithub:
crawlers := []crawler.Crawler{ghCrawlerConstructor("", "")}
// add all the documents in the index into seen.
// this greatly reduces the time overhead of CrawlGithub.
getSeedDocsFunc()
for _, d := range seedDocs {
seen[d.ID()] = d.FileType
for it.Next() {
for _, hit := range it.Value().Hits.Hits {
d := hit.Document.Document
seen.Set(d.ID(), d.FileType)
}
}
if err := it.Err(); err != nil {
log.Fatalf("Error iterating the index: %v\n", err)
}
crawler.CrawlGithub(ctx, crawlers, docConverter, indexFunc, seen)
case CrawlUser:
if *githubUserPtr == "" {

View File

@@ -213,21 +213,36 @@ func doCrawl(ctx context.Context, docsPtr *CrawlSeed, crawlers []Crawler, conv C
logger.Printf("\t%d documents cannot be converted but still were inserted or updated in the index\n", convErrCount)
}
// CrawlFromSeedIterator iterates all the documents in the index and call CrawlFromSeed for each document.
func CrawlFromSeedIterator(ctx context.Context, it *index.KustomizeIterator, crawlers []Crawler,
conv Converter, indx IndexFunc, seen utils.SeenMap) {
docCount := 0
for it.Next() {
for _, hit := range it.Value().Hits.Hits {
docCount++
logger.Printf("updating document %d from seed\n", docCount)
singleSeed := CrawlSeed{&(hit.Document.Document)}
CrawlFromSeed(ctx, singleSeed, crawlers, conv, indx, seen)
}
}
if err := it.Err(); err != nil {
log.Fatalf("Error iterating the index: %v\n", err)
}
}
// CrawlFromSeed updates all the documents in seed, and crawls all the new
// documents referred in the seed.
func CrawlFromSeed(ctx context.Context, seed CrawlSeed, crawlers []Crawler,
conv Converter, indx IndexFunc, seen utils.SeenMap) {
// stack tracks the documents directly referred in other documents.
// stack tracks the documents directly referred in the seed.
stack := make(CrawlSeed, 0)
// Exploit seed to update bulk of corpus.
logger.Printf("updating %d documents from seed\n", len(seed))
// each unique document in seed will be crawled once.
doCrawl(ctx, &seed, crawlers, conv, indx, seen, &stack, true, false)
// Traverse any new documents added while updating corpus.
logger.Printf("crawling %d new documents found in the seed\n", len(stack))
logger.Printf("crawling %d new documents referred by doc\n", len(stack))
// While crawling each document in stack, the documents directly referred in the document
// will be added into stack.
// After this statement is done, stack will become empty.
@@ -297,8 +312,6 @@ func CrawlGithubRunner(ctx context.Context, output chan<- CrawledDocument,
// CrawlGithub crawls all the kustomization files on Github.
func CrawlGithub(ctx context.Context, crawlers []Crawler, conv Converter,
indx IndexFunc, seen utils.SeenMap) {
// stack tracks the documents directly referred in other documents.
stack := make(CrawlSeed, 0)
// ch is channel where all the crawlers sends the crawled documents to.
ch := make(chan CrawledDocument, 1<<10)
@@ -324,7 +337,20 @@ func CrawlGithub(ctx context.Context, crawlers []Crawler, conv Converter,
"%v could not match any crawler", cdoc))
continue
}
// stack tracks the documents directly referred in the document.
stack := make(CrawlSeed, 0)
addBranches(cdoc, match, indx, seen, &stack)
if len(stack) > 0 {
// here the documents referred in a kustomization file are crawled separately,
// to avoid accumulating all the referred documents into a single gigantic
// mem-inentive stack.
logger.Printf("crawling the %d new documents referred in doc %d",
len(stack), docCount)
doCrawl(ctx, &stack, crawlers, conv, indx, seen, &stack, false, true)
}
}
}()
@@ -336,9 +362,4 @@ func CrawlGithub(ctx context.Context, crawlers []Crawler, conv Converter,
}
close(ch)
wg.Wait()
// Handle deps of newly discovered documents.
logger.Printf("crawling the %d new documents referred by other documents",
len(stack))
doCrawl(ctx, &stack, crawlers, conv, indx, seen, &stack, false, true)
}

View File

@@ -82,14 +82,15 @@ func (gc githubCrawler) Crawl(ctx context.Context,
ranges := []RangeWithin{
RangeWithin{
start: uint64(0),
end: githubMaxFileSize,
},
start: uint64(0),
end: githubMaxFileSize,
},
}
errs := make(multiError, 0)
for len(ranges) > 0 {
tailRange := ranges[len(ranges) - 1]
logger.Printf("Current ranges: %v (len: %d)\n", ranges, len(ranges))
tailRange := ranges[len(ranges)-1]
ranges = ranges[:(len(ranges) - 1)]
reProcessQueryRanges, err := gc.CrawlSingleRange(ctx, output, seen, tailRange.start, tailRange.end)
if err != nil {
@@ -151,7 +152,15 @@ func (gc githubCrawler) CrawlSingleRange(ctx context.Context,
}
queryResult.Add(rangeResult)
if reProcessQuery {
reProcessQueryRanges = append(reProcessQueryRanges, RangeSizes(query))
// if the size of a range is 0, such as [245, 245], and reProcessQuery is true,
// it means that there are more than 1000 results for the query range.
// Reprocessing the query range will not help because the GitHub Search API
// only provides up to 1,000 results for each search.
if RangeSizes(query).Size() == 0 {
logger.Printf("range size is 0 includes more than 1000 results: %s", query)
} else {
reProcessQueryRanges = append(reProcessQueryRanges, RangeSizes(query))
}
}
}

View File

@@ -225,3 +225,7 @@ type RangeWithin struct {
func (r RangeWithin) RangeString() string {
return fmt.Sprintf("%d..%d", r.start, r.end)
}
func (r RangeWithin) Size() uint64 {
return r.end - r.start
}

View File

@@ -1,21 +1,37 @@
package utils
type SeenMap map[string]string
import "sync"
type SeenMap struct {
data map[string]string
lock sync.RWMutex
}
// TODO: add lock to avoid race condition
func (seen SeenMap) Seen(item string) bool {
_, ok := seen[item]
seen.lock.RLock()
_, ok := seen.data[item]
seen.lock.RUnlock()
return ok
}
func (seen SeenMap) Set(k, v string) {
seen[k] = v
seen.lock.Lock()
seen.data[k] = v
seen.lock.Unlock()
}
// The caller should make sure that key is in the map.
func (seen SeenMap) Value(k string) string {
return seen[k]
seen.lock.RLock()
v := seen.data[k]
seen.lock.RUnlock()
return v
}
func NewSeenMap() SeenMap {
return make(map[string]string)
return SeenMap{
data: make(map[string]string),
lock: sync.RWMutex{},
}
}