From 171412cc988390c10b70c324d494932f007042fe Mon Sep 17 00:00:00 2001 From: Haiyan Meng Date: Sun, 14 Jun 2020 13:40:08 -0700 Subject: [PATCH 1/4] Use RWMutex to control the map access Without RWMutex, we may run into fatal error: concurrent map read and map write. --- api/internal/crawl/cmd/crawler/crawler.go | 2 +- api/internal/crawl/utils/utils.go | 26 ++++++++++++++++++----- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/api/internal/crawl/cmd/crawler/crawler.go b/api/internal/crawl/cmd/crawler/crawler.go index d87c47996..aa231cdf4 100644 --- a/api/internal/crawl/cmd/crawler/crawler.go +++ b/api/internal/crawl/cmd/crawler/crawler.go @@ -191,7 +191,7 @@ func main() { // this greatly reduces the time overhead of CrawlGithub. getSeedDocsFunc() for _, d := range seedDocs { - seen[d.ID()] = d.FileType + seen.Set(d.ID(), d.FileType) } crawler.CrawlGithub(ctx, crawlers, docConverter, indexFunc, seen) case CrawlUser: diff --git a/api/internal/crawl/utils/utils.go b/api/internal/crawl/utils/utils.go index d6b6fab68..f79454003 100644 --- a/api/internal/crawl/utils/utils.go +++ b/api/internal/crawl/utils/utils.go @@ -1,21 +1,37 @@ package utils -type SeenMap map[string]string +import "sync" +type SeenMap struct { + data map[string]string + lock sync.RWMutex +} + +// TODO: add lock to avoid race condition func (seen SeenMap) Seen(item string) bool { - _, ok := seen[item] + seen.lock.RLock() + _, ok := seen.data[item] + seen.lock.RUnlock() return ok } func (seen SeenMap) Set(k, v string) { - seen[k] = v + seen.lock.Lock() + seen.data[k] = v + seen.lock.Unlock() } // The caller should make sure that key is in the map. func (seen SeenMap) Value(k string) string { - return seen[k] + seen.lock.RLock() + v := seen.data[k] + seen.lock.RUnlock() + return v } func NewSeenMap() SeenMap { - return make(map[string]string) + return SeenMap{ + data: make(map[string]string), + lock: sync.RWMutex{}, + } } From 2d496e0efe0f670e6e16bd239a32e92421eae2fe Mon Sep 17 00:00:00 2001 From: Haiyan Meng Date: Mon, 15 Jun 2020 09:51:37 -0700 Subject: [PATCH 2/4] Update golang to 1.14 --- api/internal/crawl/cmd/crawler/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/internal/crawl/cmd/crawler/Dockerfile b/api/internal/crawl/cmd/crawler/Dockerfile index 4a8ab957b..5aa359f8f 100644 --- a/api/internal/crawl/cmd/crawler/Dockerfile +++ b/api/internal/crawl/cmd/crawler/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.11 AS build +FROM golang:1.14 AS build ARG GO111MODULE=on From a83433d5cf91103865893d45ab4bfa75e28edf82 Mon Sep 17 00:00:00 2001 From: Haiyan Meng Date: Mon, 15 Jun 2020 15:10:35 -0700 Subject: [PATCH 3/4] Optimize memory usage by avoiding accumulating all the referred documents into a single stack. --- api/internal/crawl/cmd/crawler/crawler.go | 35 +++++++----------- api/internal/crawl/crawler/crawler.go | 45 +++++++++++++++++------ 2 files changed, 46 insertions(+), 34 deletions(-) diff --git a/api/internal/crawl/cmd/crawler/crawler.go b/api/internal/crawl/cmd/crawler/crawler.go index aa231cdf4..fb31ad43c 100644 --- a/api/internal/crawl/cmd/crawler/crawler.go +++ b/api/internal/crawl/cmd/crawler/crawler.go @@ -159,40 +159,31 @@ func main() { } } - seedDocs := make(crawler.CrawlSeed, 0) - - // get all the documents in the index - getSeedDocsFunc := func() { - query := []byte(`{ "query":{ "match_all":{} } }`) - it := idx.IterateQuery(query, 10000, 60*time.Second) - for it.Next() { - for _, hit := range it.Value().Hits.Hits { - seedDocs = append(seedDocs, hit.Document.Document.Copy()) - } - } - if err := it.Err(); err != nil { - log.Fatalf("getSeedDocsFunc Error iterating: %v\n", err) - } - } + query := []byte(`{ "query":{ "match_all":{} } }`) + it := idx.IterateQuery(query, 10000, 60*time.Second) switch mode { case CrawlIndexAndGithub: - getSeedDocsFunc() crawlers := []crawler.Crawler{ghCrawlerConstructor("", "")} - crawler.CrawlFromSeed(ctx, seedDocs, crawlers, docConverter, indexFunc, seen) + crawler.CrawlFromSeedIterator(ctx, it, crawlers, docConverter, indexFunc, seen) crawler.CrawlGithub(ctx, crawlers, docConverter, indexFunc, seen) case CrawlIndex: - getSeedDocsFunc() crawlers := []crawler.Crawler{ghCrawlerConstructor("", "")} - crawler.CrawlFromSeed(ctx, seedDocs, crawlers, docConverter, indexFunc, seen) + crawler.CrawlFromSeedIterator(ctx, it, crawlers, docConverter, indexFunc, seen) case CrawlGithub: crawlers := []crawler.Crawler{ghCrawlerConstructor("", "")} // add all the documents in the index into seen. // this greatly reduces the time overhead of CrawlGithub. - getSeedDocsFunc() - for _, d := range seedDocs { - seen.Set(d.ID(), d.FileType) + for it.Next() { + for _, hit := range it.Value().Hits.Hits { + d := hit.Document.Document + seen.Set(d.ID(), d.FileType) + } } + if err := it.Err(); err != nil { + log.Fatalf("Error iterating the index: %v\n", err) + } + crawler.CrawlGithub(ctx, crawlers, docConverter, indexFunc, seen) case CrawlUser: if *githubUserPtr == "" { diff --git a/api/internal/crawl/crawler/crawler.go b/api/internal/crawl/crawler/crawler.go index 7488fcf3d..a3720675f 100644 --- a/api/internal/crawl/crawler/crawler.go +++ b/api/internal/crawl/crawler/crawler.go @@ -213,21 +213,36 @@ func doCrawl(ctx context.Context, docsPtr *CrawlSeed, crawlers []Crawler, conv C logger.Printf("\t%d documents cannot be converted but still were inserted or updated in the index\n", convErrCount) } +// CrawlFromSeedIterator iterates all the documents in the index and call CrawlFromSeed for each document. +func CrawlFromSeedIterator(ctx context.Context, it *index.KustomizeIterator, crawlers []Crawler, + conv Converter, indx IndexFunc, seen utils.SeenMap) { + docCount := 0 + for it.Next() { + for _, hit := range it.Value().Hits.Hits { + docCount++ + logger.Printf("updating document %d from seed\n", docCount) + + singleSeed := CrawlSeed{&(hit.Document.Document)} + CrawlFromSeed(ctx, singleSeed, crawlers, conv, indx, seen) + } + } + if err := it.Err(); err != nil { + log.Fatalf("Error iterating the index: %v\n", err) + } +} + // CrawlFromSeed updates all the documents in seed, and crawls all the new // documents referred in the seed. func CrawlFromSeed(ctx context.Context, seed CrawlSeed, crawlers []Crawler, conv Converter, indx IndexFunc, seen utils.SeenMap) { - // stack tracks the documents directly referred in other documents. + // stack tracks the documents directly referred in the seed. stack := make(CrawlSeed, 0) - // Exploit seed to update bulk of corpus. - logger.Printf("updating %d documents from seed\n", len(seed)) // each unique document in seed will be crawled once. doCrawl(ctx, &seed, crawlers, conv, indx, seen, &stack, true, false) - // Traverse any new documents added while updating corpus. - logger.Printf("crawling %d new documents found in the seed\n", len(stack)) + logger.Printf("crawling %d new documents referred by doc\n", len(stack)) // While crawling each document in stack, the documents directly referred in the document // will be added into stack. // After this statement is done, stack will become empty. @@ -297,8 +312,6 @@ func CrawlGithubRunner(ctx context.Context, output chan<- CrawledDocument, // CrawlGithub crawls all the kustomization files on Github. func CrawlGithub(ctx context.Context, crawlers []Crawler, conv Converter, indx IndexFunc, seen utils.SeenMap) { - // stack tracks the documents directly referred in other documents. - stack := make(CrawlSeed, 0) // ch is channel where all the crawlers sends the crawled documents to. ch := make(chan CrawledDocument, 1<<10) @@ -324,7 +337,20 @@ func CrawlGithub(ctx context.Context, crawlers []Crawler, conv Converter, "%v could not match any crawler", cdoc)) continue } + + // stack tracks the documents directly referred in the document. + stack := make(CrawlSeed, 0) + addBranches(cdoc, match, indx, seen, &stack) + + if len(stack) > 0 { + // here the documents referred in a kustomization file are crawled separately, + // to avoid accumulating all the referred documents into a single gigantic + // mem-inentive stack. + logger.Printf("crawling the %d new documents referred in doc %d", + len(stack), docCount) + doCrawl(ctx, &stack, crawlers, conv, indx, seen, &stack, false, true) + } } }() @@ -336,9 +362,4 @@ func CrawlGithub(ctx context.Context, crawlers []Crawler, conv Converter, } close(ch) wg.Wait() - - // Handle deps of newly discovered documents. - logger.Printf("crawling the %d new documents referred by other documents", - len(stack)) - doCrawl(ctx, &stack, crawlers, conv, indx, seen, &stack, false, true) } From 145ba0c7ff181767ebd25d0086a6cfca068f49be Mon Sep 17 00:00:00 2001 From: Haiyan Meng Date: Thu, 18 Jun 2020 09:07:57 -0700 Subject: [PATCH 4/4] Avoid reprocess queries whose range size is 0 --- api/internal/crawl/crawler/github/crawler.go | 19 ++++++++++++++----- api/internal/crawl/crawler/github/queries.go | 4 ++++ 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/api/internal/crawl/crawler/github/crawler.go b/api/internal/crawl/crawler/github/crawler.go index a749c6784..d668a86b4 100644 --- a/api/internal/crawl/crawler/github/crawler.go +++ b/api/internal/crawl/crawler/github/crawler.go @@ -82,14 +82,15 @@ func (gc githubCrawler) Crawl(ctx context.Context, ranges := []RangeWithin{ RangeWithin{ - start: uint64(0), - end: githubMaxFileSize, - }, + start: uint64(0), + end: githubMaxFileSize, + }, } errs := make(multiError, 0) for len(ranges) > 0 { - tailRange := ranges[len(ranges) - 1] + logger.Printf("Current ranges: %v (len: %d)\n", ranges, len(ranges)) + tailRange := ranges[len(ranges)-1] ranges = ranges[:(len(ranges) - 1)] reProcessQueryRanges, err := gc.CrawlSingleRange(ctx, output, seen, tailRange.start, tailRange.end) if err != nil { @@ -151,7 +152,15 @@ func (gc githubCrawler) CrawlSingleRange(ctx context.Context, } queryResult.Add(rangeResult) if reProcessQuery { - reProcessQueryRanges = append(reProcessQueryRanges, RangeSizes(query)) + // if the size of a range is 0, such as [245, 245], and reProcessQuery is true, + // it means that there are more than 1000 results for the query range. + // Reprocessing the query range will not help because the GitHub Search API + // only provides up to 1,000 results for each search. + if RangeSizes(query).Size() == 0 { + logger.Printf("range size is 0 includes more than 1000 results: %s", query) + } else { + reProcessQueryRanges = append(reProcessQueryRanges, RangeSizes(query)) + } } } diff --git a/api/internal/crawl/crawler/github/queries.go b/api/internal/crawl/crawler/github/queries.go index 444ee3d10..df2018efa 100644 --- a/api/internal/crawl/crawler/github/queries.go +++ b/api/internal/crawl/crawler/github/queries.go @@ -225,3 +225,7 @@ type RangeWithin struct { func (r RangeWithin) RangeString() string { return fmt.Sprintf("%d..%d", r.start, r.end) } + +func (r RangeWithin) Size() uint64 { + return r.end - r.start +}