Reprocess the github filesize search ranges which have more than 1000 items

This commit is contained in:
Haiyan Meng
2020-01-29 11:34:41 -08:00
parent 0fcb3a014c
commit 7a87c84403
3 changed files with 104 additions and 51 deletions

View File

@@ -80,6 +80,36 @@ func (gc githubCrawler) DefaultBranch(repo string) string {
func (gc githubCrawler) Crawl(ctx context.Context, func (gc githubCrawler) Crawl(ctx context.Context,
output chan<- crawler.CrawledDocument, seen utils.SeenMap) error { output chan<- crawler.CrawledDocument, seen utils.SeenMap) error {
ranges := []RangeWithin{
RangeWithin{
start: uint64(0),
end: githubMaxFileSize,
},
}
errs := make(multiError, 0)
for len(ranges) > 0 {
tailRange := ranges[len(ranges) - 1]
ranges = ranges[:(len(ranges) - 1)]
reProcessQueryRanges, err := gc.CrawlSingleRange(ctx, output, seen, tailRange.start, tailRange.end)
if err != nil {
errs = append(errs, err)
}
ranges = append(ranges, reProcessQueryRanges...)
}
if len(errs) > 0 {
return errs
}
return nil
}
func (gc githubCrawler) CrawlSingleRange(ctx context.Context,
output chan<- crawler.CrawledDocument, seen utils.SeenMap,
lowerBound, upperBound uint64) ([]RangeWithin, error) {
log.Printf("CrawlSingleRange [%d, %d]", lowerBound, upperBound)
noETagClient := GhClient{ noETagClient := GhClient{
RequestConfig: gc.client.RequestConfig, RequestConfig: gc.client.RequestConfig,
client: &http.Client{Timeout: gc.client.client.Timeout}, client: &http.Client{Timeout: gc.client.client.Timeout},
@@ -87,13 +117,16 @@ func (gc githubCrawler) Crawl(ctx context.Context,
accessToken: gc.client.accessToken, accessToken: gc.client.accessToken,
} }
var reProcessQueryRanges []RangeWithin
var ranges []string var ranges []string
var err error var err error
// Since Github returns a max of 1000 results per query, we can use // Since Github returns a max of 1000 results per query, we can use
// multiple queries that split the search space into chunks of at most // multiple queries that split the search space into chunks of at most
// 1000 files to get all of the data. // 1000 files to get all of the data.
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
ranges, err = FindRangesForRepoSearch(newCache(noETagClient, gc.query)) ranges, err = FindRangesForRepoSearch(newCache(noETagClient, gc.query),
lowerBound, upperBound)
if err == nil { if err == nil {
logger.Printf("FindRangesForRepoSearch succeeded after %d retries", i) logger.Printf("FindRangesForRepoSearch succeeded after %d retries", i)
break break
@@ -102,7 +135,7 @@ func (gc githubCrawler) Crawl(ctx context.Context,
} }
} }
if err != nil { if err != nil {
return fmt.Errorf("could not split %v into ranges, %v\n", return reProcessQueryRanges, fmt.Errorf("could not split %v into ranges, %v\n",
gc.query, err) gc.query, err)
} }
@@ -112,20 +145,23 @@ func (gc githubCrawler) Crawl(ctx context.Context,
errs := make(multiError, 0) errs := make(multiError, 0)
queryResult := RangeQueryResult{} queryResult := RangeQueryResult{}
for _, query := range ranges { for _, query := range ranges {
rangeResult, err := processQuery(ctx, gc.client, query, output, seen, gc.branchMap) reProcessQuery, rangeResult, err := processQuery(ctx, gc.client, query, output, seen, gc.branchMap)
if err != nil { if err != nil {
errs = append(errs, err) errs = append(errs, err)
} }
queryResult.Add(rangeResult) queryResult.Add(rangeResult)
if reProcessQuery {
reProcessQueryRanges = append(reProcessQueryRanges, RangeSizes(query))
}
} }
logger.Printf("Summary of Crawl: %s", queryResult.String()) logger.Printf("Summary of Crawl: %s", queryResult.String())
if len(errs) > 0 { if len(errs) > 0 {
return errs return reProcessQueryRanges, errs
} }
return nil return reProcessQueryRanges, nil
} }
// FetchDocument first tries to fetch the document with d.FilePath. If it fails, // FetchDocument first tries to fetch the document with d.FilePath. If it fails,
@@ -225,7 +261,7 @@ func (r *RangeQueryResult) String() string {
// documents from the crawl to the datastore/index. // documents from the crawl to the datastore/index.
func processQuery(ctx context.Context, gcl GhClient, query string, func processQuery(ctx context.Context, gcl GhClient, query string,
output chan<- crawler.CrawledDocument, seen utils.SeenMap, output chan<- crawler.CrawledDocument, seen utils.SeenMap,
branchMap map[string]string) (RangeQueryResult, error) { branchMap map[string]string) (bool, RangeQueryResult, error) {
queryPages := make(chan GhResponseInfo) queryPages := make(chan GhResponseInfo)
@@ -241,6 +277,8 @@ func processQuery(ctx context.Context, gcl GhClient, query string,
close(queryPages) close(queryPages)
}() }()
reProcessQuery := false
errs := make(multiError, 0) errs := make(multiError, 0)
result := RangeQueryResult{} result := RangeQueryResult{}
pageID := 1 pageID := 1
@@ -271,11 +309,15 @@ func processQuery(ctx context.Context, gcl GhClient, query string,
result.Add(pageResult) result.Add(pageResult)
pageID++ pageID++
if page.Parsed.TotalCount > githubMaxResultsPerQuery {
reProcessQuery = true
}
} }
logger.Printf("Summary of processQuery: %s", result.String()) logger.Printf("Summary of processQuery: %s", result.String())
return result, errs return reProcessQuery, result, errs
} }
func kustomizationResultAdapter(gcl GhClient, k GhFileSpec, seen utils.SeenMap, func kustomizationResultAdapter(gcl GhClient, k GhFileSpec, seen utils.SeenMap,

View File

@@ -100,6 +100,8 @@ package github
import ( import (
"fmt" "fmt"
"math/bits" "math/bits"
"strconv"
"strings"
) )
// Files cannot be more than 2^19 bytes, according to // Files cannot be more than 2^19 bytes, according to
@@ -112,7 +114,7 @@ const (
// Interface instead of struct for testing purposes. // Interface instead of struct for testing purposes.
// Not expecting to have multiple implementations. // Not expecting to have multiple implementations.
type cachedSearch interface { type cachedSearch interface {
CountResults(uint64) (uint64, error) CountResults(uint64, uint64) (uint64, error)
RequestString(filesize rangeFormatter) string RequestString(filesize rangeFormatter) string
} }
@@ -161,13 +163,13 @@ func newCache(client GhClient, query Query) githubCachedSearch {
} }
} }
func (c githubCachedSearch) CountResults(upperBound uint64) (uint64, error) { func (c githubCachedSearch) CountResults(lowerBound, upperBound uint64) (uint64, error) {
count, cached := c.cache[upperBound] count, cached := c.cache[upperBound]
if cached { if cached {
return count, nil return count, nil
} }
sizeRange := RangeWithin{0, upperBound} sizeRange := RangeWithin{lowerBound, upperBound}
rangeRequest := c.RequestString(sizeRange) rangeRequest := c.RequestString(sizeRange)
result := c.gcl.parseGithubResponse(rangeRequest) result := c.gcl.parseGithubResponse(rangeRequest)
@@ -238,8 +240,8 @@ func (c githubCachedSearch) RequestString(filesize rangeFormatter) string {
// This would mean that the search as it is could not find all files. If queries // This would mean that the search as it is could not find all files. If queries
// are sorted by last indexed, and retrieved on regular intervals, it should be // are sorted by last indexed, and retrieved on regular intervals, it should be
// sufficient to get most if not all documents. // sufficient to get most if not all documents.
func FindRangesForRepoSearch(cache cachedSearch) ([]string, error) { func FindRangesForRepoSearch(cache cachedSearch, lowerBound, upperBound uint64) ([]string, error) {
totalFiles, err := cache.CountResults(githubMaxFileSize) totalFiles, err := cache.CountResults(lowerBound, upperBound)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -247,7 +249,7 @@ func FindRangesForRepoSearch(cache cachedSearch) ([]string, error) {
if githubMaxResultsPerQuery >= totalFiles { if githubMaxResultsPerQuery >= totalFiles {
return []string{ return []string{
cache.RequestString(RangeWithin{0, githubMaxFileSize}), cache.RequestString(RangeWithin{lowerBound, upperBound}),
}, nil }, nil
} }
@@ -275,6 +277,7 @@ func FindRangesForRepoSearch(cache cachedSearch) ([]string, error) {
// range. // range.
filesAccessible := uint64(0) filesAccessible := uint64(0)
sizes := make([]uint64, 0) sizes := make([]uint64, 0)
sizes = append(sizes, lowerBound)
for filesAccessible < totalFiles { for filesAccessible < totalFiles {
target := filesAccessible + githubMaxResultsPerQuery target := filesAccessible + githubMaxResultsPerQuery
if target >= totalFiles { if target >= totalFiles {
@@ -284,22 +287,22 @@ func FindRangesForRepoSearch(cache cachedSearch) ([]string, error) {
logger.Printf("%d accessible files, next target = %d\n", logger.Printf("%d accessible files, next target = %d\n",
filesAccessible, target) filesAccessible, target)
cur, err := lowerBoundFileCount(cache, target) size, err := FindFileSize(cache, target, lowerBound, upperBound)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// If there are more than 1000 files in the next bucket, we must // If there are more than 1000 files in the next bucket, we must
// advance anyway and lose out on some files :(. // advance anyway and lose out on some files :(.
if l := len(sizes); l > 0 && sizes[l-1] == cur { if l := len(sizes); l > 0 && sizes[l-1] == size {
cur++ size++
} }
nextAccessible, err := cache.CountResults(cur) nextAccessible, err := cache.CountResults(lowerBound, size)
if err != nil { if err != nil {
return nil, fmt.Errorf( return nil, fmt.Errorf(
"cache should be populated at %d already, got %v", "cache should be populated at %d already, got %v",
cur, err) size, err)
} }
if nextAccessible < filesAccessible { if nextAccessible < filesAccessible {
return nil, fmt.Errorf( return nil, fmt.Errorf(
@@ -309,31 +312,31 @@ func FindRangesForRepoSearch(cache cachedSearch) ([]string, error) {
filesAccessible = nextAccessible filesAccessible = nextAccessible
if nextAccessible < totalFiles { if nextAccessible < totalFiles {
sizes = append(sizes, cur) sizes = append(sizes, size)
} }
} }
sizes = append(sizes, upperBound)
return formatFilesizeRanges(cache, sizes), nil return formatFilesizeRanges(cache, sizes), nil
} }
// lowerBoundFileCount finds the filesize range from [0, return value] that has // FindFileSize finds the filesize range from [lowerBound, return value] that has
// the largest file count that is smaller than or equal to // the largest file count that is smaller than or equal to
// githubMaxResultsPerQuery. It is important to note that this returned value // githubMaxResultsPerQuery. It is important to note that this returned value
// could already be in a previous range if the next file size has more than 1000 // could already be in a previous range if the next file size has more than 1000
// results. It is left to the caller to handle this bit of logic and guarantee // results. It is left to the caller to handle this bit of logic and guarantee
// forward progession in this case. // forward progession in this case.
func lowerBoundFileCount( func FindFileSize(
cache cachedSearch, targetFileCount uint64) (uint64, error) { cache cachedSearch, targetFileCount, lowerBound, upperBound uint64) (uint64, error) {
// Binary search for file sizes that make up the next <=1000 element // Binary search for file sizes that make up the next <=1000 element
// chunk. // chunk.
cur := uint64(0) cur := lowerBound
increase := githubMaxFileSize / 2 increase := (upperBound - lowerBound) / 2
for increase > 0 { for increase > 0 {
mid := cur + increase mid := cur + increase
count, err := cache.CountResults(mid) count, err := cache.CountResults(lowerBound, mid)
if err != nil { if err != nil {
return count, err return count, err
} }
@@ -353,26 +356,24 @@ func lowerBoundFileCount(
} }
func formatFilesizeRanges(cache cachedSearch, sizes []uint64) []string { func formatFilesizeRanges(cache cachedSearch, sizes []uint64) []string {
ranges := make([]string, 0, len(sizes)+1) n := len(sizes)
if n < 2 {
if len(sizes) > 0 { return []string{}
ranges = append(ranges, cache.RequestString(
RangeLessThan{sizes[0] + 1},
))
} }
for i := 0; i < len(sizes)-1; i += 1 { ranges := make([]string, 0, n-1)
ranges = append(ranges, cache.RequestString( ranges = append(ranges, cache.RequestString(RangeWithin{sizes[0], sizes[1]}))
RangeWithin{sizes[i] + 1, sizes[i+1]}, for i := 1; i < n-1; i++ {
)) ranges = append(ranges, cache.RequestString(RangeWithin{sizes[i] + 1, sizes[i+1]}))
if i != len(sizes)-2 {
continue
}
ranges = append(ranges, cache.RequestString(
RangeGreaterThan{sizes[i+1]},
))
} }
return ranges return ranges
} }
func RangeSizes(s string) RangeWithin {
start := strings.Index(s, "+size:") + len("+size:")
end := strings.Index(s, "&")
ranges := strings.Split(s[start:end], "..")
lowerBound, _ := strconv.ParseUint(ranges[0], 10, 64)
upperBound, _ := strconv.ParseUint(ranges[1], 10, 64)
return RangeWithin{lowerBound, upperBound}
}

View File

@@ -11,7 +11,7 @@ type testCachedSearch struct {
cache map[uint64]uint64 cache map[uint64]uint64
} }
func (c testCachedSearch) CountResults(upperBound uint64) (uint64, error) { func (c testCachedSearch) CountResults(lowerBound, upperBound uint64) (uint64, error) {
log.Printf("CountResults(%05x)\n", upperBound) log.Printf("CountResults(%05x)\n", upperBound)
count, ok := c.cache[upperBound] count, ok := c.cache[upperBound]
if !ok { if !ok {
@@ -73,19 +73,29 @@ func TestRangeSplitting(t *testing.T) {
}, },
} }
requests, err := FindRangesForRepoSearch(cache) requests, err := FindRangesForRepoSearch(cache, 0, 524288)
if err != nil { if err != nil {
t.Errorf("Error while finding ranges: %v", err) t.Errorf("Error while finding ranges: %v", err)
} }
expected := []string{ expected := []string{
"<107", // cache.RequestString(RangeLessThan{0x6b}), "0..106", // cache.RequestString(RangeWithin{0x00, 0x6a}),
"107..128", // cache.RequestString(RangeWithin{0x6b, 0x80}), "107..128", // cache.RequestString(RangeWithin{0x6b, 0x80}),
"129..256", // cache.RequestString(RangeWithin{0x81, 0x100}), "129..256", // cache.RequestString(RangeWithin{0x81, 0x100}),
"257..4095", // cache.RequestString(RangeWithin{0x101, 0xfff}), "257..4095", // cache.RequestString(RangeWithin{0x101, 0xfff}),
">4095", // cache.RequestString(RangeGreaterThan{0xfff}), "4096..524288", // cache.RequestString(RangeWithin{0x1000, 0x80000}),
} }
if !reflect.DeepEqual(requests, expected) { if !reflect.DeepEqual(requests, expected) {
t.Errorf("Expected requests (%v) to equal (%v)", requests, expected) t.Errorf("Expected requests (%v) to equal (%v)", requests, expected)
} }
} }
func TestRangeSizes(t *testing.T) {
s := "https://api.github.com/search/code?q=filename:kustomization.yaml+filename:kustomization.yml" +
"+filename:kustomization+size:2365..10000&order=desc&per_page=100&sort=indexed"
returnedResult := RangeSizes(s)
expectedResult := RangeWithin{uint64(2365), uint64(10000)}
if !reflect.DeepEqual(returnedResult, expectedResult) {
t.Errorf("RangeSizes expected (%v), got (%v)",expectedResult, returnedResult)
}
}