Improve the efficency of crawling github

Make sure a github file is crawled once
This commit is contained in:
Haiyan Meng
2020-01-13 14:56:47 -08:00
parent d71d2df364
commit 81d62f90bf
3 changed files with 96 additions and 32 deletions

View File

@@ -29,7 +29,7 @@ type Crawler interface {
// Crawl returns when it is done processing. This method does not take
// ownership of the channel. The channel is write only, and it
// designates where the crawler should forward the documents.
Crawl(ctx context.Context, output chan<- CrawledDocument) error
Crawl(ctx context.Context, output chan<- CrawledDocument, seen map[string]struct{}) error
// Get the document data given the FilePath, Repo, and Ref/Tag/Branch.
FetchDocument(context.Context, *doc.Document) error
@@ -231,7 +231,7 @@ func CrawlFromSeed(ctx context.Context, seed CrawlSeed, crawlers []Crawler,
// from the seed will be processed before any other documents from the
// crawlers.
func CrawlGithubRunner(ctx context.Context, output chan<- CrawledDocument,
crawlers []Crawler) []error {
crawlers []Crawler, seen map[string]struct{}) []error {
errs := make([]error, len(crawlers))
wg := sync.WaitGroup{}
@@ -265,7 +265,7 @@ func CrawlGithubRunner(ctx context.Context, output chan<- CrawledDocument,
}
}()
defer close(docs)
errs[idx] = crawler.Crawl(ctx, docs)
errs[idx] = crawler.Crawl(ctx, docs, seen)
}(i, crawler, docs) // Copies the index and the crawler
}
@@ -306,7 +306,7 @@ func CrawlGithub(ctx context.Context, crawlers []Crawler, conv Converter,
}()
logger.Println("processing the documents found from crawling github")
if errs := CrawlGithubRunner(ctx, ch, crawlers); errs != nil {
if errs := CrawlGithubRunner(ctx, ch, crawlers, seen); errs != nil {
for _, err := range errs {
logIfErr(err)
}

View File

@@ -75,7 +75,7 @@ func newCrawler(matchPrefix string, err error,
// Crawl implements the Crawler interface for testing.
func (c testCrawler) Crawl(_ context.Context,
output chan<- CrawledDocument) error {
output chan<- CrawledDocument, _ map[string]struct{}) error {
for i, d := range c.docs {
isResource := true
@@ -181,8 +181,9 @@ func TestCrawlGithubRunner(t *testing.T) {
defer close(output)
defer wg.Done()
seen := map[string]struct{}{}
errs := CrawlGithubRunner(context.Background(),
output, test.tc)
output, test.tc, seen)
// Check that errors are returned as they should be.
if !reflect.DeepEqual(errs, test.errs) {

View File

@@ -30,6 +30,8 @@ var logger = log.New(os.Stdout, "Github Crawler: ",
type githubCrawler struct {
client GhClient
query Query
// branchMap maps github repositories to their default branches
branchMap map[string]string
}
type GhClient struct {
@@ -52,12 +54,21 @@ func NewCrawler(accessToken string, retryCount uint64, client *http.Client,
accessToken: accessToken,
},
query: query,
branchMap: map[string]string{},
}
}
func (gc githubCrawler) SetDefaultBranch(repo, branch string) {
gc.branchMap[repo] = branch
}
func (gc githubCrawler) DefaultBranch(repo string) string {
return gc.branchMap[repo]
}
// Implements crawler.Crawler.
func (gc githubCrawler) Crawl(
ctx context.Context, output chan<- crawler.CrawledDocument) error {
func (gc githubCrawler) Crawl(ctx context.Context,
output chan<- crawler.CrawledDocument, seen map[string]struct{}) error {
noETagClient := GhClient{
RequestConfig: gc.client.RequestConfig,
@@ -79,17 +90,26 @@ func (gc githubCrawler) Crawl(
// Query each range for files.
errs := make(multiError, 0)
queryResult := RangeQueryResult{}
for _, query := range ranges {
err := processQuery(ctx, gc.client, query, output)
result, err := processQuery(ctx, gc.client, query, output, seen, gc.branchMap)
if err != nil {
errs = append(errs, err)
}
queryResult.totalDocCnt += result.totalDocCnt
queryResult.seenDocCnt += result.seenDocCnt
queryResult.newDocCnt += result.newDocCnt
queryResult.errorCnt += result.errorCnt
}
if len(errs) > 0 {
return errs
}
logger.Printf("Summary of Crawl: got %d files from Github. "+
"%d have been seen before. %d are new and sent to the output channel." +
"%d have kustomizationResultAdapter errors.",
queryResult.totalDocCnt, queryResult.seenDocCnt,
queryResult.newDocCnt, queryResult.errorCnt)
return nil
}
@@ -100,7 +120,7 @@ func (gc githubCrawler) FetchDocument(_ context.Context, d *doc.Document) error
// set the default branch if it is empty
if d.DefaultBranch == "" {
url := gc.client.ReposRequest(d.RepositoryFullName())
defaultBranch, err := gc.client.GetDefaultBranch(url)
defaultBranch, err := gc.client.GetDefaultBranch(url, d.RepositoryURL, gc.branchMap)
if err != nil {
logger.Printf(
"(error: %v) setting default_branch to master\n", err)
@@ -108,6 +128,8 @@ func (gc githubCrawler) FetchDocument(_ context.Context, d *doc.Document) error
}
d.DefaultBranch = defaultBranch
}
gc.SetDefaultBranch(d.RepositoryURL, d.DefaultBranch)
repoURL := d.RepositoryURL + "/" + d.FilePath + "?ref=" + d.DefaultBranch
repoSpec, err := git.NewRepoSpecFromUrl(repoURL)
if err != nil {
@@ -176,10 +198,18 @@ func (gc githubCrawler) Match(d *doc.Document) bool {
return strings.Contains(repoSpec.Host, "github.com")
}
type RangeQueryResult struct {
totalDocCnt uint64
seenDocCnt uint64
newDocCnt uint64
errorCnt uint64
}
// processQuery follows all of the pages in a query, and updates/adds the
// documents from the crawl to the datastore/index.
func processQuery(ctx context.Context, gcl GhClient, query string,
output chan<- crawler.CrawledDocument) error {
output chan<- crawler.CrawledDocument, seen map[string]struct{},
branchMap map[string]string) (RangeQueryResult, error) {
queryPages := make(chan GhResponseInfo)
@@ -196,50 +226,75 @@ func processQuery(ctx context.Context, gcl GhClient, query string,
}()
errs := make(multiError, 0)
errorCnt := 0
totalCnt := 0
result := RangeQueryResult{}
pageID := 1
for page := range queryPages {
if page.Error != nil {
errs = append(errs, page.Error)
continue
}
var errorCnt, seenDocCnt, newDocCnt, totalDocCnt uint64
for _, file := range page.Parsed.Items {
k, err := kustomizationResultAdapter(gcl, file)
k, err := kustomizationResultAdapter(gcl, file, seen, branchMap)
if err != nil {
logger.Printf("kustomizationResultAdapter failed: %v", err)
errs = append(errs, err)
errorCnt++
}
if k != nil {
newDocCnt++
output <- k
} else {
seenDocCnt++
}
totalCnt++
totalDocCnt++
}
logger.Printf("got %d files out of %d from API. %d of %d had errors\n",
totalCnt, page.Parsed.TotalCount, errorCnt, totalCnt)
logger.Printf("processQuery [page %d]: got %d files out of %d from API. "+
"%d have been seen before. %d are new and sent to the output channel." +
"%d have kustomizationResultAdapter errors.",
pageID, totalDocCnt, page.Parsed.TotalCount, seenDocCnt, newDocCnt, errorCnt)
result.totalDocCnt += totalDocCnt
result.seenDocCnt += seenDocCnt
result.newDocCnt += newDocCnt
result.errorCnt += errorCnt
pageID++
}
return errs
logger.Printf("Summary of processQuery: got %d files from API. "+
"%d have been seen before. %d are new and sent to the output channel." +
" %d have kustomizationResultAdapter errors.",
result.totalDocCnt, result.seenDocCnt, result.newDocCnt, result.errorCnt)
return result, errs
}
func kustomizationResultAdapter(gcl GhClient, k GhFileSpec) (
crawler.CrawledDocument, error) {
data, err := gcl.GetFileData(k)
if err != nil {
return nil, err
}
func kustomizationResultAdapter(gcl GhClient, k GhFileSpec, seen map[string]struct{},
branchMap map[string]string) (crawler.CrawledDocument, error) {
url := gcl.ReposRequest(k.Repository.FullName)
defaultBranch, err := gcl.GetDefaultBranch(url)
defaultBranch, err := gcl.GetDefaultBranch(url, k.Repository.URL, branchMap)
if err != nil {
logger.Printf(
"(error: %v) setting default_branch to master\n", err)
defaultBranch = "master"
}
document := doc.Document{
FilePath: k.Path,
DefaultBranch: defaultBranch,
RepositoryURL: k.Repository.URL,
}
if _, ok := seen[document.ID()]; ok {
return nil, nil
}
data, err := gcl.GetFileData(k)
if err != nil {
return nil, err
}
d := doc.KustomizationDocument{
Document: doc.Document{
DocumentData: string(data),
@@ -344,7 +399,15 @@ func CloseResponseBody(resp *http.Response) {
}
}
func (gcl GhClient) GetDefaultBranch(url string) (string, error) {
// GetDefaultBranch gets the default branch of a github repository.
// m is a map which maps a github repository to its default branch.
// If repo is already in m, the default branch for url will be obtained from m;
// otherwise, a query will be made to github to obtain the default branch.
func (gcl GhClient) GetDefaultBranch(url, repo string, m map[string]string) (string, error) {
if v, ok := m[repo]; ok {
return v, nil
}
resp, err := gcl.GetReposData(url)
if err != nil {
return "", fmt.Errorf(