Merge pull request #1948 from haiyanmeng/expose-es

Add supports for crawling a specific git user or repo
This commit is contained in:
Jeff Regan
2019-12-13 13:24:03 -08:00
committed by GitHub
7 changed files with 265 additions and 196 deletions

View File

@@ -21,9 +21,14 @@ const (
redisCacheURL = "REDIS_CACHE_URL"
redisKeyURL = "REDIS_KEY_URL"
retryCount = 3
githubUserEnv = "GITHUB_USER"
githubRepoEnv = "GITHUB_REPO"
)
func main() {
githubUser := os.Getenv(githubUserEnv)
githubRepo := os.Getenv(githubRepoEnv)
githubToken := os.Getenv(githubAccessTokenVar)
if githubToken == "" {
fmt.Printf("Must set the variable '%s' to make github requests.\n",
@@ -38,21 +43,9 @@ func main() {
return
}
seedDocs := make(crawler.CrawlSeed, 0)
cacheURL := os.Getenv(redisCacheURL)
query := []byte(`{ "query":{ "match_all":{} } }`)
it := idx.IterateQuery(query, 10000, 60*time.Second)
docs := make(crawler.CrawlSeed, 0)
for it.Next() {
for _, hit := range it.Value().Hits.Hits {
docs = append(docs, hit.Document.Copy())
}
}
if err := it.Err(); err != nil {
fmt.Printf("Error iterating: %v\n", err)
}
cache, err := redis.DialURL(cacheURL)
clientCache := &http.Client{}
if err != nil {
@@ -61,33 +54,69 @@ func main() {
clientCache = httpclient.NewClient(cache)
}
ghCrawler := github.NewCrawler(githubToken, retryCount, clientCache,
github.QueryWith(
github.Filename("kustomization.yaml"),
github.Filename("kustomization.yml")),
)
// docConverter takes in a plain document and processes it for the index.
docConverter := func(d *doc.Document) (crawler.CrawledDocument, error) {
kdoc := doc.KustomizationDocument{
Document: *d,
}
crawler.CrawlFromSeed(ctx, docs, []crawler.Crawler{ghCrawler},
// Converter takes in a plain document and processes it for the
// index.
func(d *doc.Document) (crawler.CrawledDocument, error) {
kdoc := doc.KustomizationDocument{
Document: *d,
}
err := kdoc.ParseYAML()
return &kdoc, err
}
err := kdoc.ParseYAML()
return &kdoc, err
},
// IndexFunc updates the value in the index.
func(cdoc crawler.CrawledDocument, crwlr crawler.Crawler) error {
switch d := cdoc.(type) {
case *doc.KustomizationDocument:
fmt.Println("Inserting: ", d.ID(), d)
_, err := idx.Put(d.ID(), d)
return err
default:
return fmt.Errorf("type %T not supported", d)
// Index updates the value in the index.
index := func(cdoc crawler.CrawledDocument, crwlr crawler.Crawler) error {
switch d := cdoc.(type) {
case *doc.KustomizationDocument:
fmt.Println("Inserting: ", d)
_, err := idx.Put(d.ID(), d)
return err
default:
return fmt.Errorf("type %T not supported", d)
}
}
// seen tracks the IDs of all the documents in the index.
// This helps avoid indexing a given document multiple times.
seen := make(map[string]struct{})
var ghCrawler crawler.Crawler
if githubRepo != "" {
ghCrawler = github.NewCrawler(githubToken, retryCount, clientCache,
github.QueryWith(
github.Filename("kustomization.yaml"),
github.Filename("kustomization.yml"),
github.Repo(githubRepo)),
)
} else if githubUser != "" {
ghCrawler = github.NewCrawler(githubToken, retryCount, clientCache,
github.QueryWith(
github.Filename("kustomization.yaml"),
github.Filename("kustomization.yml"),
github.User(githubUser)),
)
} else {
ghCrawler = github.NewCrawler(githubToken, retryCount, clientCache,
github.QueryWith(
github.Filename("kustomization.yaml"),
github.Filename("kustomization.yml")),
)
// get all the documents in the index
query := []byte(`{ "query":{ "match_all":{} } }`)
it := idx.IterateQuery(query, 10000, 60*time.Second)
for it.Next() {
for _, hit := range it.Value().Hits.Hits {
seedDocs = append(seedDocs, hit.Document.Copy())
}
},
)
}
if err := it.Err(); err != nil {
fmt.Printf("Error iterating: %v\n", err)
}
}
crawlers := []crawler.Crawler{ghCrawler}
crawler.CrawlFromSeed(ctx, seedDocs, crawlers, docConverter, index, seen)
crawler.CrawlGithub(ctx, crawlers, docConverter, index, seen)
}

View File

@@ -40,6 +40,7 @@ type Crawler interface {
type CrawledDocument interface {
ID() string
GetDocument() *doc.Document
// Get all the Documents directly referred in a Document.
GetResources() ([]*doc.Document, error)
WasCached() bool
}
@@ -49,135 +50,115 @@ type CrawlSeed []*doc.Document
type IndexFunc func(CrawledDocument, Crawler) error
type Converter func(*doc.Document) (CrawledDocument, error)
// Cleaner, more efficient, and more extensible crawler implementation.
// The seed must include the ids of each document in the index.
func CrawlFromSeed(ctx context.Context, seed CrawlSeed,
crawlers []Crawler, conv Converter, indx IndexFunc) {
func logIfErr(err error) {
if err == nil {
return
}
logger.Println("error: ", err)
}
seen := make(map[string]struct{})
logIfErr := func(err error) {
if err == nil {
return
func findMatch(d *doc.Document, crawlers []Crawler) Crawler {
for _, crawl := range crawlers {
if crawl.Match(d) {
return crawl
}
logger.Println("error: ", err)
}
return nil
}
func addBranches(cdoc CrawledDocument, match Crawler, indx IndexFunc,
seen map[string]struct{}, stack *CrawlSeed) {
seen[cdoc.ID()] = struct{}{}
// Insert into index
err := indx(cdoc, match)
logIfErr(err)
if err != nil {
return
}
deps, err := cdoc.GetResources()
logIfErr(err)
if err != nil {
return
}
for _, dep := range deps {
if _, ok := seen[dep.ID()]; ok {
continue
}
*stack = append(*stack, dep)
}
}
func doCrawl(ctx context.Context, docsPtr *CrawlSeed, crawlers []Crawler, conv Converter, indx IndexFunc,
seen map[string]struct{}, stack *CrawlSeed) {
docCount := 0
// During the execution of the for loop, more Documents may be added into (*docsPtr).
for len(*docsPtr) > 0 {
// get the last Document in (*docPtr), which will be crawled in this iteration.
tail := (*docsPtr)[len(*docsPtr)-1]
// remove the last Document in (*docPtr)
*docsPtr = (*docsPtr)[:(len(*docsPtr) - 1)]
if _, ok := seen[tail.ID()]; ok {
continue
}
docCount++
match := findMatch(tail, crawlers)
if match == nil {
logIfErr(fmt.Errorf(
"%v could not match any crawler", tail))
continue
}
logger.Println("Crawling ", tail.RepositoryURL, tail.FilePath)
err := match.FetchDocument(ctx, tail)
logIfErr(err)
// If there was no change or there is an error, we don't have
// to branch out, since the dependencies are already in the
// index, or we cannot find the document.
if err != nil || tail.WasCached() {
if tail.WasCached() {
logger.Println(tail.RepositoryURL, tail.FilePath, "is cached already")
}
continue
}
logIfErr(match.SetCreated(ctx, tail))
cdoc, err := conv(tail)
logIfErr(err)
addBranches(cdoc, match, indx, seen, stack)
}
logger.Printf("%d documents were crawled by doCrawl\n", docCount)
}
// CrawlFromSeed updates all the documents in seed, and crawls all the new
// documents referred in the seed.
func CrawlFromSeed(ctx context.Context, seed CrawlSeed, crawlers []Crawler,
conv Converter, indx IndexFunc, seen map[string]struct{}) {
// stack tracks the documents directly referred in other documents.
stack := make(CrawlSeed, 0)
findMatch := func(d *doc.Document) Crawler {
for _, crawl := range crawlers {
if crawl.Match(d) {
return crawl
}
}
return nil
}
addBranches := func(cdoc CrawledDocument, match Crawler) {
if _, ok := seen[cdoc.ID()]; ok {
return
}
seen[cdoc.ID()] = struct{}{}
// Insert into index
err := indx(cdoc, match)
logIfErr(err)
if err != nil {
return
}
deps, err := cdoc.GetResources()
logIfErr(err)
if err != nil {
return
}
for _, dep := range deps {
if _, ok := seen[dep.ID()]; ok {
continue
}
stack = append(stack, dep)
}
}
doCrawl := func(docsPtr *CrawlSeed) {
n := len(*docsPtr)
for i := 0; i < n; i++ {
next := (*docsPtr)[i]
match := findMatch(next)
if match == nil {
logIfErr(fmt.Errorf(
"%v could not match any crawler", next))
continue
}
logger.Println("Crawling ", next.RepositoryURL, next.FilePath)
err := match.FetchDocument(ctx, next)
logIfErr(err)
// If there was no change or there is an error, we don't have
// to branch out, since the dependencies are already in the
// index, or we cannot find the document.
if err != nil || next.WasCached() {
if next.WasCached() {
logger.Println(next.RepositoryURL, next.FilePath, "is cached already")
}
continue
}
logIfErr(match.SetCreated(ctx, next))
cdoc, err := conv(next)
logIfErr(err)
addBranches(cdoc, match)
}
}
// Exploit seed to update bulk of corpus.
logger.Printf("updating %d documents from seed\n", len(seed))
doCrawl(&seed)
// Traverse any new links added while updating corpus.
// each unique document in seed will be crawled once.
doCrawl(ctx, &seed, crawlers, conv, indx, seen, &stack)
// Traverse any new documents added while updating corpus.
logger.Printf("crawling %d new documents found in the seed\n", len(stack))
doCrawl(&stack)
ch := make(chan CrawledDocument, 1<<10)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for cdoc := range ch {
if _, ok := seen[cdoc.ID()]; ok {
continue
}
match := findMatch(cdoc.GetDocument())
if match == nil {
logIfErr(fmt.Errorf(
"%v could not match any crawler", cdoc))
continue
}
addBranches(cdoc, match)
}
}()
// Exploration through APIs.
errs := CRunner(ctx, ch, crawlers)
if errs != nil {
for _, err := range errs {
logIfErr(err)
}
}
close(ch)
logger.Println("Processing the new documents from the crawlers' exploration.")
wg.Wait()
// Handle deps of newly discovered documents.
logger.Printf("crawling the %d new documents from the crawlers' exploration.",
len(stack))
doCrawl(&stack)
// While crawling each document in stack, the documents directly referred in the document
// will be added into stack.
// After this statement is done, stack will become empty.
doCrawl(ctx, &stack, crawlers, conv, indx, seen, &stack)
}
// CRunner is a blocking function and only returns once all of the
// CrawlGithubRunner is a blocking function and only returns once all of the
// crawlers are finished with execution.
//
// This function uses the output channel to forward kustomization documents
@@ -188,14 +169,14 @@ func CrawlFromSeed(ctx context.Context, seed CrawlSeed,
// index of the crawler that emitted the error. Although the errors themselves
// can be nil, the array will always be exactly the size of the crawlers array.
//
// CRunner takes in a seed, which represents the documents stored in an
// CrawlGithubRunner takes in a seed, which represents the documents stored in an
// index somewhere. The document data is not required to be populated. If there
// are many documents, this is preferable. The order of iteration over the seed
// is not guaranteed, but the CRunner does guarantee that every element
// is not guaranteed, but the CrawlGithub does guarantee that every element
// from the seed will be processed before any other documents from the
// crawlers.
func CRunner(ctx context.Context,
output chan<- CrawledDocument, crawlers []Crawler) []error {
func CrawlGithubRunner(ctx context.Context, output chan<- CrawledDocument,
crawlers []Crawler) []error {
errs := make([]error, len(crawlers))
wg := sync.WaitGroup{}
@@ -236,3 +217,46 @@ func CRunner(ctx context.Context,
wg.Wait()
return errs
}
// CrawlGithub crawls all the kustomization files on Github.
func CrawlGithub(ctx context.Context, crawlers []Crawler, conv Converter,
indx IndexFunc, seen map[string]struct{}) {
// stack tracks the documents directly referred in other documents.
stack := make(CrawlSeed, 0)
// ch is channel where all the crawlers sends the crawled documents to.
ch := make(chan CrawledDocument, 1<<10)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for cdoc := range ch {
if _, ok := seen[cdoc.ID()]; ok {
continue
}
match := findMatch(cdoc.GetDocument(), crawlers)
if match == nil {
logIfErr(fmt.Errorf(
"%v could not match any crawler", cdoc))
continue
}
addBranches(cdoc, match, indx, seen, &stack)
}
}()
logger.Println("processing the documents found from crawling github")
if errs := CrawlGithubRunner(ctx, ch, crawlers); errs != nil {
for _, err := range errs {
logIfErr(err)
}
}
close(ch)
wg.Wait()
// Handle deps of newly discovered documents.
logger.Printf("crawling the %d new documents referred by other documents",
len(stack))
doCrawl(ctx, &stack, crawlers, conv, indx, seen, &stack)
}

View File

@@ -38,12 +38,13 @@ func (c testCrawler) FetchDocument(_ context.Context, d *doc.Document) error {
return nil
}
for _, suffix := range konfig.RecognizedKustomizationFileNames() {
fmt.Println(d.ID(), "/", suffix)
i, ok := c.lukp[d.ID()+"/"+suffix]
savedFilePath := d.FilePath
d.FilePath += "/" + suffix
i, ok := c.lukp[d.ID()]
if !ok {
d.FilePath = savedFilePath
continue
}
d.FilePath += "/" + suffix
d.DocumentData = c.docs[i].DocumentData
return nil
}
@@ -106,8 +107,8 @@ func (s sortableDocs) Len() int {
return len(s)
}
func TestCrawlerRunner(t *testing.T) {
fmt.Println("testing CRunner")
func TestCrawlGithubRunner(t *testing.T) {
fmt.Println("testing CrawlGithubRunner")
tests := []struct {
tc []Crawler
errs []error
@@ -178,7 +179,7 @@ func TestCrawlerRunner(t *testing.T) {
defer close(output)
defer wg.Done()
errs := CRunner(context.Background(),
errs := CrawlGithubRunner(context.Background(),
output, test.tc)
// Check that errors are returned as they should be.
@@ -302,29 +303,6 @@ resources:
RepositoryURL: kustomizeRepo,
FilePath: "examples/seedcrawl2/job.yaml",
}},
// Visited from the crawler runner.
{Document: doc.Document{
RepositoryURL: kustomizeRepo,
FilePath: "examples/other/base/kustomization.yaml",
DocumentData: `
resources:
- ../app
`,
}},
// Visited from the crawler runner.
{Document: doc.Document{
RepositoryURL: kustomizeRepo,
FilePath: "examples/other/app/kustomization.yaml",
DocumentData: `
resources:
- resource.yaml
`,
}},
// Visited from crawling runner imported as resource.
{Document: doc.Document{
RepositoryURL: kustomizeRepo,
FilePath: "examples/other/app/resource.yaml",
}},
},
},
}
@@ -342,6 +320,7 @@ resources:
visited[d.ID()]++
return nil
},
make(map[string]struct{}),
)
if lv, lc := len(visited), len(tc.corpus); lv != lc {
t.Errorf("error: %d of %d documents visited.", lv, lc)

View File

@@ -230,7 +230,6 @@ func kustomizationResultAdapter(gcl GhClient, k GhFileSpec) (
RepositoryURL: k.Repository.URL,
},
}
logger.Printf("Set the creationTime field")
creationTime, err := gcl.GetFileCreationTime(k)
if err != nil {
logger.Printf("GetFileCreationTime failed: %v", err)
@@ -533,7 +532,7 @@ func (gcl GhClient) parseGithubResponse(getRequest string) GhResponseInfo {
}
// SearchGithubAPI performs a search query and handles rate limitting for
// the 'code/search?' endpoint as well as timed retries in the case of abuse
// the 'search/code?' endpoint as well as timed retries in the case of abuse
// prevention.
func (gcl GhClient) SearchGithubAPI(query string) (*http.Response, error) {
throttleSearchAPI()

View File

@@ -90,6 +90,17 @@ func Path(p string) queryField {
return queryField{name: "path", value: p}
}
// Repo takes a repository (i.e., kubernetes-sigs/kustomize) and formats
// it according to the Github API.
func Repo(r string) queryField {
return queryField{name: "repo", value: r}
}
// Path takes a github username and formats it according to the Github API.
func User(u string) queryField {
return queryField{name: "user", value: u}
}
// RequestConfig stores common variables that must be present for the queries.
// - CodeSearchRequests: ask Github to check the code indices given a query.
// - ContentsRequests: ask Github where to download a resource given a repo and a
@@ -123,11 +134,15 @@ func (rc RequestConfig) ReposRequest(fullRepoName string) string {
return rc.makeRequest(uri, Query{}).URL()
}
func escapeSpace(s string) string {
return strings.Replace(s, " ", "%20", -1)
}
// CommitsRequest given the repo name, and a filepath returns a formatted query
// for the Github API to find the commits that affect this file.
func (rc RequestConfig) CommitsRequest(fullRepoName, path string) string {
uri := fmt.Sprintf("repos/%s/commits", fullRepoName)
return rc.makeRequest(uri, Query{Path(path)}).URL()
return rc.makeRequest(uri, Query{Path(escapeSpace(path))}).URL()
}
func (rc RequestConfig) makeRequest(path string, query Query) request {

View File

@@ -53,8 +53,11 @@ func TestQueryType(t *testing.T) {
Filename("kustomization.yaml"),
Keyword("keyword1"),
Keyword("keyword2"),
Repo("user1/repo1"),
User("user1"),
),
expected: "q=size:24..64+filename:kustomization.yaml+keyword1+keyword2",
expected: "q=size:24..64+filename:kustomization.yaml+keyword1+keyword2+" +
"repo:user1/repo1+user:user1",
},
}
@@ -100,6 +103,26 @@ func TestGithubSearchQuery(t *testing.T) {
expectedCommitsQuery: "https://api.github.com/repos/kubernetes-sigs/kustomize/commits?" +
"q=path:examples/helloWorld/kustomization.yaml&per_page=100",
},
{
rc: RequestConfig{
perPage: perPage,
},
codeQuery: Query{
Filename("kustomization.yaml"),
Filesize(RangeWithin{64, 128}),
},
fullRepoName: "kubernetes-sigs/kustomize",
path: "examples 1/helloWorld/kustomization.yaml",
expectedCodeQuery: "https://api.github.com/search/code?" +
"q=filename:kustomization.yaml+size:64..128&order=desc&per_page=100&sort=indexed",
expectedContentsQuery: "https://api.github.com/repos/kubernetes-sigs/kustomize/contents/" +
"examples%201/helloWorld/kustomization.yaml?per_page=100",
expectedCommitsQuery: "https://api.github.com/repos/kubernetes-sigs/kustomize/commits?" +
"q=path:examples%201/helloWorld/kustomization.yaml&per_page=100",
},
}
for _, test := range testCases {

View File

@@ -243,7 +243,7 @@ func FindRangesForRepoSearch(cache cachedSearch) ([]string, error) {
if err != nil {
return nil, err
}
logger.Println("total files: ", totalFiles)
logger.Println("total kustomization files: ", totalFiles)
if githubMaxResultsPerQuery >= totalFiles {
return []string{