Merge pull request #2097 from haiyanmeng/improve

Improve the efficiency of crawling github  by making sure a github file is crawled only once
This commit is contained in:
Jeff Regan
2020-01-14 13:16:55 -08:00
committed by GitHub
5 changed files with 141 additions and 44 deletions

View File

@@ -121,7 +121,7 @@ func main() {
// seen tracks the IDs of all the documents in the index.
// This helps avoid indexing a given document multiple times.
seen := make(map[string]struct{})
seen := crawler.NewSeenMap()
var mode CrawlMode
if len(os.Args) == 1 {

View File

@@ -29,7 +29,7 @@ type Crawler interface {
// Crawl returns when it is done processing. This method does not take
// ownership of the channel. The channel is write only, and it
// designates where the crawler should forward the documents.
Crawl(ctx context.Context, output chan<- CrawledDocument) error
Crawl(ctx context.Context, output chan<- CrawledDocument, seen SeenMap) error
// Get the document data given the FilePath, Repo, and Ref/Tag/Branch.
FetchDocument(context.Context, *doc.Document) error
@@ -47,6 +47,21 @@ type CrawledDocument interface {
WasCached() bool
}
type SeenMap map[string]struct{}
func (seen SeenMap) Seen(item string) bool {
_, ok := seen[item]
return ok
}
func (seen SeenMap) Add(item string) {
seen[item] = struct{}{}
}
func NewSeenMap() SeenMap {
return make(map[string]struct{})
}
type CrawlSeed []*doc.Document
type IndexFunc func(CrawledDocument, index.Mode) error
@@ -69,9 +84,9 @@ func findMatch(d *doc.Document, crawlers []Crawler) Crawler {
}
func addBranches(cdoc CrawledDocument, match Crawler, indx IndexFunc,
seen map[string]struct{}, stack *CrawlSeed) {
seen SeenMap, stack *CrawlSeed) {
seen[cdoc.ID()] = struct{}{}
seen.Add(cdoc.ID())
// Insert into index
if err := indx(cdoc, index.InsertOrUpdate); err != nil {
@@ -87,7 +102,7 @@ func addBranches(cdoc CrawledDocument, match Crawler, indx IndexFunc,
}
for _, dep := range deps {
if _, ok := seen[dep.ID()]; ok {
if seen.Seen(dep.ID()) {
continue
}
*stack = append(*stack, dep)
@@ -95,7 +110,7 @@ func addBranches(cdoc CrawledDocument, match Crawler, indx IndexFunc,
}
func doCrawl(ctx context.Context, docsPtr *CrawlSeed, crawlers []Crawler, conv Converter, indx IndexFunc,
seen map[string]struct{}, stack *CrawlSeed) {
seen SeenMap, stack *CrawlSeed) {
UpdatedDocCount := 0
seenDocCount := 0
@@ -118,7 +133,7 @@ func doCrawl(ctx context.Context, docsPtr *CrawlSeed, crawlers []Crawler, conv C
crawledDocCount++
logger.Printf("Crawling doc %d: %s %s", crawledDocCount, tail.RepositoryURL, tail.FilePath)
if _, ok := seen[tail.ID()]; ok {
if seen.Seen(tail.ID()) {
logger.Printf("this doc has been seen before")
seenDocCount++
continue
@@ -144,7 +159,8 @@ func doCrawl(ctx context.Context, docsPtr *CrawlSeed, crawlers []Crawler, conv C
// calling FetchDocument. Otherwise, the binary may enter into an infinite loop
// if a kustomization file points to its kustmozation root in its `resources` or
// `bases` field.
seen[tail.ID()] = struct{}{}
seen.Add(tail.ID())
if err := match.FetchDocument(ctx, tail); err != nil {
logger.Printf("FetchDocument failed on %s %s: %v",
@@ -154,7 +170,7 @@ func doCrawl(ctx context.Context, docsPtr *CrawlSeed, crawlers []Crawler, conv C
cdoc := &doc.KustomizationDocument{
Document: *tail,
}
seen[cdoc.ID()] = struct{}{}
seen.Add(cdoc.ID())
if err := indx(cdoc, index.Delete); err != nil {
logger.Printf("Failed to delete %s %s: %v",
cdoc.RepositoryURL, cdoc.FilePath, err)
@@ -195,7 +211,7 @@ func doCrawl(ctx context.Context, docsPtr *CrawlSeed, crawlers []Crawler, conv C
// CrawlFromSeed updates all the documents in seed, and crawls all the new
// documents referred in the seed.
func CrawlFromSeed(ctx context.Context, seed CrawlSeed, crawlers []Crawler,
conv Converter, indx IndexFunc, seen map[string]struct{}) {
conv Converter, indx IndexFunc, seen SeenMap) {
// stack tracks the documents directly referred in other documents.
stack := make(CrawlSeed, 0)
@@ -231,7 +247,7 @@ func CrawlFromSeed(ctx context.Context, seed CrawlSeed, crawlers []Crawler,
// from the seed will be processed before any other documents from the
// crawlers.
func CrawlGithubRunner(ctx context.Context, output chan<- CrawledDocument,
crawlers []Crawler) []error {
crawlers []Crawler, seen SeenMap) []error {
errs := make([]error, len(crawlers))
wg := sync.WaitGroup{}
@@ -265,7 +281,7 @@ func CrawlGithubRunner(ctx context.Context, output chan<- CrawledDocument,
}
}()
defer close(docs)
errs[idx] = crawler.Crawl(ctx, docs)
errs[idx] = crawler.Crawl(ctx, docs, seen)
}(i, crawler, docs) // Copies the index and the crawler
}
@@ -275,7 +291,7 @@ func CrawlGithubRunner(ctx context.Context, output chan<- CrawledDocument,
// CrawlGithub crawls all the kustomization files on Github.
func CrawlGithub(ctx context.Context, crawlers []Crawler, conv Converter,
indx IndexFunc, seen map[string]struct{}) {
indx IndexFunc, seen SeenMap) {
// stack tracks the documents directly referred in other documents.
stack := make(CrawlSeed, 0)
@@ -291,7 +307,7 @@ func CrawlGithub(ctx context.Context, crawlers []Crawler, conv Converter,
for cdoc := range ch {
docCount++
logger.Printf("Processing doc %d found on Github", docCount)
if _, ok := seen[cdoc.ID()]; ok {
if seen.Seen(cdoc.ID()) {
logger.Printf("the doc has been seen before")
continue
}
@@ -306,7 +322,7 @@ func CrawlGithub(ctx context.Context, crawlers []Crawler, conv Converter,
}()
logger.Println("processing the documents found from crawling github")
if errs := CrawlGithubRunner(ctx, ch, crawlers); errs != nil {
if errs := CrawlGithubRunner(ctx, ch, crawlers, seen); errs != nil {
for _, err := range errs {
logIfErr(err)
}

View File

@@ -75,7 +75,7 @@ func newCrawler(matchPrefix string, err error,
// Crawl implements the Crawler interface for testing.
func (c testCrawler) Crawl(_ context.Context,
output chan<- CrawledDocument) error {
output chan<- CrawledDocument, _ SeenMap) error {
for i, d := range c.docs {
isResource := true
@@ -181,8 +181,9 @@ func TestCrawlGithubRunner(t *testing.T) {
defer close(output)
defer wg.Done()
seen := NewSeenMap()
errs := CrawlGithubRunner(context.Background(),
output, test.tc)
output, test.tc, seen)
// Check that errors are returned as they should be.
if !reflect.DeepEqual(errs, test.errs) {
@@ -322,7 +323,7 @@ resources:
visited[d.ID()]++
return nil
},
make(map[string]struct{}),
NewSeenMap(),
)
if lv, lc := len(visited), len(tc.corpus); lv != lc {
t.Errorf("error: %d of %d documents visited.", lv, lc)

View File

@@ -30,6 +30,8 @@ var logger = log.New(os.Stdout, "Github Crawler: ",
type githubCrawler struct {
client GhClient
query Query
// branchMap maps github repositories to their default branches
branchMap map[string]string
}
type GhClient struct {
@@ -51,13 +53,22 @@ func NewCrawler(accessToken string, retryCount uint64, client *http.Client,
},
accessToken: accessToken,
},
query: query,
query: query,
branchMap: map[string]string{},
}
}
func (gc githubCrawler) SetDefaultBranch(repo, branch string) {
gc.branchMap[repo] = branch
}
func (gc githubCrawler) DefaultBranch(repo string) string {
return gc.branchMap[repo]
}
// Implements crawler.Crawler.
func (gc githubCrawler) Crawl(
ctx context.Context, output chan<- crawler.CrawledDocument) error {
func (gc githubCrawler) Crawl(ctx context.Context,
output chan<- crawler.CrawledDocument, seen crawler.SeenMap) error {
noETagClient := GhClient{
RequestConfig: gc.client.RequestConfig,
@@ -79,13 +90,17 @@ func (gc githubCrawler) Crawl(
// Query each range for files.
errs := make(multiError, 0)
queryResult := RangeQueryResult{}
for _, query := range ranges {
err := processQuery(ctx, gc.client, query, output)
rangeResult, err := processQuery(ctx, gc.client, query, output, seen, gc.branchMap)
if err != nil {
errs = append(errs, err)
}
queryResult.Add(rangeResult)
}
logger.Printf("Summary of Crawl: %s", queryResult.String())
if len(errs) > 0 {
return errs
}
@@ -100,7 +115,7 @@ func (gc githubCrawler) FetchDocument(_ context.Context, d *doc.Document) error
// set the default branch if it is empty
if d.DefaultBranch == "" {
url := gc.client.ReposRequest(d.RepositoryFullName())
defaultBranch, err := gc.client.GetDefaultBranch(url)
defaultBranch, err := gc.client.GetDefaultBranch(url, d.RepositoryURL, gc.branchMap)
if err != nil {
logger.Printf(
"(error: %v) setting default_branch to master\n", err)
@@ -108,6 +123,8 @@ func (gc githubCrawler) FetchDocument(_ context.Context, d *doc.Document) error
}
d.DefaultBranch = defaultBranch
}
gc.SetDefaultBranch(d.RepositoryURL, d.DefaultBranch)
repoURL := d.RepositoryURL + "/" + d.FilePath + "?ref=" + d.DefaultBranch
repoSpec, err := git.NewRepoSpecFromUrl(repoURL)
if err != nil {
@@ -176,10 +193,32 @@ func (gc githubCrawler) Match(d *doc.Document) bool {
return strings.Contains(repoSpec.Host, "github.com")
}
type RangeQueryResult struct {
totalDocCnt uint64
seenDocCnt uint64
newDocCnt uint64
errorCnt uint64
}
func (r *RangeQueryResult) Add(other RangeQueryResult) {
r.totalDocCnt += other.totalDocCnt
r.newDocCnt += other.newDocCnt
r.seenDocCnt += other.seenDocCnt
r.errorCnt += other.errorCnt
}
func (r *RangeQueryResult) String() string {
return fmt.Sprintf("got %d files from API. "+
"%d have been seen before. %d are new and sent to the output channel." +
" %d have kustomizationResultAdapter errors.",
r.totalDocCnt, r.seenDocCnt, r.newDocCnt, r.errorCnt)
}
// processQuery follows all of the pages in a query, and updates/adds the
// documents from the crawl to the datastore/index.
func processQuery(ctx context.Context, gcl GhClient, query string,
output chan<- crawler.CrawledDocument) error {
output chan<- crawler.CrawledDocument, seen crawler.SeenMap,
branchMap map[string]string) (RangeQueryResult, error) {
queryPages := make(chan GhResponseInfo)
@@ -196,50 +235,67 @@ func processQuery(ctx context.Context, gcl GhClient, query string,
}()
errs := make(multiError, 0)
errorCnt := 0
totalCnt := 0
result := RangeQueryResult{}
pageID := 1
for page := range queryPages {
if page.Error != nil {
errs = append(errs, page.Error)
continue
}
pageResult := RangeQueryResult{}
for _, file := range page.Parsed.Items {
k, err := kustomizationResultAdapter(gcl, file)
k, err := kustomizationResultAdapter(gcl, file, seen, branchMap)
if err != nil {
logger.Printf("kustomizationResultAdapter failed: %v", err)
errs = append(errs, err)
errorCnt++
pageResult.errorCnt++
}
if k != nil {
pageResult.newDocCnt++
output <- k
} else {
pageResult.seenDocCnt++
}
totalCnt++
pageResult.totalDocCnt++
}
logger.Printf("got %d files out of %d from API. %d of %d had errors\n",
totalCnt, page.Parsed.TotalCount, errorCnt, totalCnt)
logger.Printf("processQuery [TotalCount %d - page %d]: %s",
page.Parsed.TotalCount, pageID, pageResult.String())
result.Add(pageResult)
pageID++
}
return errs
logger.Printf("Summary of processQuery: %s", result.String())
return result, errs
}
func kustomizationResultAdapter(gcl GhClient, k GhFileSpec) (
crawler.CrawledDocument, error) {
data, err := gcl.GetFileData(k)
if err != nil {
return nil, err
}
func kustomizationResultAdapter(gcl GhClient, k GhFileSpec, seen crawler.SeenMap,
branchMap map[string]string) (crawler.CrawledDocument, error) {
url := gcl.ReposRequest(k.Repository.FullName)
defaultBranch, err := gcl.GetDefaultBranch(url)
defaultBranch, err := gcl.GetDefaultBranch(url, k.Repository.URL, branchMap)
if err != nil {
logger.Printf(
"(error: %v) setting default_branch to master\n", err)
defaultBranch = "master"
}
document := doc.Document{
FilePath: k.Path,
DefaultBranch: defaultBranch,
RepositoryURL: k.Repository.URL,
}
if seen.Seen(document.ID()) {
return nil, nil
}
data, err := gcl.GetFileData(k)
if err != nil {
return nil, err
}
d := doc.KustomizationDocument{
Document: doc.Document{
DocumentData: string(data),
@@ -344,7 +400,15 @@ func CloseResponseBody(resp *http.Response) {
}
}
func (gcl GhClient) GetDefaultBranch(url string) (string, error) {
// GetDefaultBranch gets the default branch of a github repository.
// m is a map which maps a github repository to its default branch.
// If repo is already in m, the default branch for url will be obtained from m;
// otherwise, a query will be made to github to obtain the default branch.
func (gcl GhClient) GetDefaultBranch(url, repo string, m map[string]string) (string, error) {
if v, ok := m[repo]; ok {
return v, nil
}
resp, err := gcl.GetReposData(url)
if err != nil {
return "", fmt.Errorf(
@@ -589,7 +653,7 @@ func (gcl GhClient) Do(query string) (*http.Response, error) {
// gcl.client.Do: a non-2xx status code doesn't cause an error.
// See https://golang.org/pkg/net/http/#Client.Do for more info.
resp, err := gcl.client.Do(req)
resp, err := gcl.client.Do(req)
if resp != nil && resp.StatusCode != http.StatusOK {
err = fmt.Errorf("GhClient.Do(%s) failed with response code: %d",
query, resp.StatusCode)

View File

@@ -63,4 +63,20 @@ curl -X GET "${ElasticSearchURL}:9200/kustomize/_search?pretty" -H 'Content-Type
}
}
'
```
Search all the documents whose filePath does not end with any of these following
three filenames: `kustomization.yaml`, `kustomization.yml`, `kustomization`:
```
curl -X GET "${ElasticSearchURL}:9200/kustomize/_search?pretty" -H 'Content-Type: application/json' -d'
{
"query": {
"bool": {
"must_not": [
{ "regexp": { "filePath": ".*/kustomization((.yaml)?|(.yml)?)/*" }}
]
}
}
}
'
```