Separate the two types of crawling

1) crawling the documents in the index to update these documents;
2) crawling the whole github.
This commit is contained in:
Haiyan Meng
2019-12-12 12:37:04 -08:00
parent d9239104aa
commit 50ce2a66a3
3 changed files with 181 additions and 173 deletions

View File

@@ -67,27 +67,35 @@ func main() {
github.Filename("kustomization.yml")),
)
crawler.CrawlFromSeed(ctx, docs, []crawler.Crawler{ghCrawler},
// Converter takes in a plain document and processes it for the
// index.
func(d *doc.Document) (crawler.CrawledDocument, error) {
kdoc := doc.KustomizationDocument{
Document: *d,
}
// docConverter takes in a plain document and processes it for the index.
docConverter := func(d *doc.Document) (crawler.CrawledDocument, error) {
kdoc := doc.KustomizationDocument{
Document: *d,
}
err := kdoc.ParseYAML()
return &kdoc, err
},
// IndexFunc updates the value in the index.
func(cdoc crawler.CrawledDocument, crwlr crawler.Crawler) error {
switch d := cdoc.(type) {
case *doc.KustomizationDocument:
fmt.Println("Inserting: ", d.ID(), d)
_, err := idx.Put(d.ID(), d)
return err
default:
return fmt.Errorf("type %T not supported", d)
}
},
)
err := kdoc.ParseYAML()
return &kdoc, err
}
// Index updates the value in the index.
index := func(cdoc crawler.CrawledDocument, crwlr crawler.Crawler) error {
switch d := cdoc.(type) {
case *doc.KustomizationDocument:
fmt.Println("Inserting: ", d.ID(), d)
_, err := idx.Put(d.ID(), d)
return err
default:
return fmt.Errorf("type %T not supported", d)
}
}
// seen tracks the IDs of all the documents in the index.
// This helps avoid indexing a given document multiple times.
seen := make(map[string]struct{})
crawlers := []crawler.Crawler{ghCrawler}
crawler.CrawlFromSeed(ctx, docs, crawlers, docConverter, index, seen)
crawler.CrawlGithub(ctx, crawlers, docConverter, index, seen)
}

View File

@@ -40,6 +40,7 @@ type Crawler interface {
type CrawledDocument interface {
ID() string
GetDocument() *doc.Document
// Get all the Documents directly referred in a Document.
GetResources() ([]*doc.Document, error)
WasCached() bool
}
@@ -49,135 +50,113 @@ type CrawlSeed []*doc.Document
type IndexFunc func(CrawledDocument, Crawler) error
type Converter func(*doc.Document) (CrawledDocument, error)
// Cleaner, more efficient, and more extensible crawler implementation.
// The seed must include the ids of each document in the index.
func CrawlFromSeed(ctx context.Context, seed CrawlSeed,
crawlers []Crawler, conv Converter, indx IndexFunc) {
func logIfErr(err error) {
if err == nil {
return
}
logger.Println("error: ", err)
}
seen := make(map[string]struct{})
logIfErr := func(err error) {
if err == nil {
return
func findMatch(d *doc.Document, crawlers []Crawler) Crawler {
for _, crawl := range crawlers {
if crawl.Match(d) {
return crawl
}
logger.Println("error: ", err)
}
return nil
}
func addBranches(cdoc CrawledDocument, match Crawler, indx IndexFunc,
seen map[string]struct{}, stack *CrawlSeed) {
if _, ok := seen[cdoc.ID()]; ok {
return
}
seen[cdoc.ID()] = struct{}{}
// Insert into index
err := indx(cdoc, match)
logIfErr(err)
if err != nil {
return
}
deps, err := cdoc.GetResources()
logIfErr(err)
if err != nil {
return
}
for _, dep := range deps {
if _, ok := seen[dep.ID()]; ok {
continue
}
*stack = append(*stack, dep)
}
}
func doCrawl(ctx context.Context, docsPtr *CrawlSeed, crawlers []Crawler, conv Converter, indx IndexFunc,
seen map[string]struct{}, stack *CrawlSeed) {
docCount := 0
// During the execution of the for loop, more Documents may be added into (*docsPtr).
for len(*docsPtr) > 0 {
docCount++
// get the last Document in (*docPtr), which will be crawled in this iteration.
tail := (*docsPtr)[len(*docsPtr)-1]
// remove the last Document in (*docPtr)
*docsPtr = (*docsPtr)[:(len(*docsPtr)-1)]
match := findMatch(tail, crawlers)
if match == nil {
logIfErr(fmt.Errorf(
"%v could not match any crawler", tail))
continue
}
logger.Println("Crawling ", tail.RepositoryURL, tail.FilePath)
err := match.FetchDocument(ctx, tail)
logIfErr(err)
// If there was no change or there is an error, we don't have
// to branch out, since the dependencies are already in the
// index, or we cannot find the document.
if err != nil || tail.WasCached() {
if tail.WasCached() {
logger.Println(tail.RepositoryURL, tail.FilePath, "is cached already")
}
continue
}
logIfErr(match.SetCreated(ctx, tail))
cdoc, err := conv(tail)
logIfErr(err)
addBranches(cdoc, match, indx, seen, stack)
}
logger.Printf("%d documents were crawled by doCrawl\n", docCount)
}
// CrawlFromSeed updates all the documents in seed, and crawls all the new
// documents referred in the seed.
func CrawlFromSeed(ctx context.Context, seed CrawlSeed, crawlers []Crawler,
conv Converter, indx IndexFunc, seen map[string]struct{}) {
// stack tracks the documents directly referred in other documents.
stack := make(CrawlSeed, 0)
findMatch := func(d *doc.Document) Crawler {
for _, crawl := range crawlers {
if crawl.Match(d) {
return crawl
}
}
return nil
}
addBranches := func(cdoc CrawledDocument, match Crawler) {
if _, ok := seen[cdoc.ID()]; ok {
return
}
seen[cdoc.ID()] = struct{}{}
// Insert into index
err := indx(cdoc, match)
logIfErr(err)
if err != nil {
return
}
deps, err := cdoc.GetResources()
logIfErr(err)
if err != nil {
return
}
for _, dep := range deps {
if _, ok := seen[dep.ID()]; ok {
continue
}
stack = append(stack, dep)
}
}
doCrawl := func(docsPtr *CrawlSeed) {
n := len(*docsPtr)
for i := 0; i < n; i++ {
next := (*docsPtr)[i]
match := findMatch(next)
if match == nil {
logIfErr(fmt.Errorf(
"%v could not match any crawler", next))
continue
}
logger.Println("Crawling ", next.RepositoryURL, next.FilePath)
err := match.FetchDocument(ctx, next)
logIfErr(err)
// If there was no change or there is an error, we don't have
// to branch out, since the dependencies are already in the
// index, or we cannot find the document.
if err != nil || next.WasCached() {
if next.WasCached() {
logger.Println(next.RepositoryURL, next.FilePath, "is cached already")
}
continue
}
logIfErr(match.SetCreated(ctx, next))
cdoc, err := conv(next)
logIfErr(err)
addBranches(cdoc, match)
}
}
// Exploit seed to update bulk of corpus.
logger.Printf("updating %d documents from seed\n", len(seed))
doCrawl(&seed)
// Traverse any new links added while updating corpus.
// each unique document in seed will be crawled once.
doCrawl(ctx, &seed, crawlers, conv, indx, seen, &stack)
// Traverse any new documents added while updating corpus.
logger.Printf("crawling %d new documents found in the seed\n", len(stack))
doCrawl(&stack)
ch := make(chan CrawledDocument, 1<<10)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for cdoc := range ch {
if _, ok := seen[cdoc.ID()]; ok {
continue
}
match := findMatch(cdoc.GetDocument())
if match == nil {
logIfErr(fmt.Errorf(
"%v could not match any crawler", cdoc))
continue
}
addBranches(cdoc, match)
}
}()
// Exploration through APIs.
errs := CRunner(ctx, ch, crawlers)
if errs != nil {
for _, err := range errs {
logIfErr(err)
}
}
close(ch)
logger.Println("Processing the new documents from the crawlers' exploration.")
wg.Wait()
// Handle deps of newly discovered documents.
logger.Printf("crawling the %d new documents from the crawlers' exploration.",
len(stack))
doCrawl(&stack)
// While crawling each document in stack, the documents directly referred in the document
// will be added into stack.
// After this statement is done, stack will become empty.
doCrawl(ctx, &stack, crawlers, conv, indx, seen, &stack)
}
// CRunner is a blocking function and only returns once all of the
// CrawlGithubRunner is a blocking function and only returns once all of the
// crawlers are finished with execution.
//
// This function uses the output channel to forward kustomization documents
@@ -188,14 +167,14 @@ func CrawlFromSeed(ctx context.Context, seed CrawlSeed,
// index of the crawler that emitted the error. Although the errors themselves
// can be nil, the array will always be exactly the size of the crawlers array.
//
// CRunner takes in a seed, which represents the documents stored in an
// CrawlGithubRunner takes in a seed, which represents the documents stored in an
// index somewhere. The document data is not required to be populated. If there
// are many documents, this is preferable. The order of iteration over the seed
// is not guaranteed, but the CRunner does guarantee that every element
// is not guaranteed, but the CrawlGithub does guarantee that every element
// from the seed will be processed before any other documents from the
// crawlers.
func CRunner(ctx context.Context,
output chan<- CrawledDocument, crawlers []Crawler) []error {
func CrawlGithubRunner(ctx context.Context, output chan<- CrawledDocument,
crawlers []Crawler) []error {
errs := make([]error, len(crawlers))
wg := sync.WaitGroup{}
@@ -236,3 +215,45 @@ func CRunner(ctx context.Context,
wg.Wait()
return errs
}
// CrawlGithub crawls all the kustomization files on Github.
func CrawlGithub(ctx context.Context, crawlers []Crawler, conv Converter,
indx IndexFunc, seen map[string]struct{}) {
// stack tracks the documents directly referred in other documents.
stack := make(CrawlSeed, 0)
// ch is channel where all the crawlers sends the crawled documents to.
ch := make(chan CrawledDocument, 1<<10)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for cdoc := range ch {
if _, ok := seen[cdoc.ID()]; ok {
continue
}
match := findMatch(cdoc.GetDocument(), crawlers)
if match == nil {
logIfErr(fmt.Errorf(
"%v could not match any crawler", cdoc))
continue
}
addBranches(cdoc, match, indx, seen, &stack)
}
}()
if errs := CrawlGithubRunner(ctx, ch, crawlers); errs != nil {
for _, err := range errs {
logIfErr(err)
}
}
close(ch)
logger.Println("Processing the documents found from crawling github")
wg.Wait()
// Handle deps of newly discovered documents.
logger.Printf("crawling the %d new documents referred by other documents",
len(stack))
doCrawl(ctx, &stack, crawlers, conv, indx, seen, &stack)
}

View File

@@ -38,12 +38,13 @@ func (c testCrawler) FetchDocument(_ context.Context, d *doc.Document) error {
return nil
}
for _, suffix := range konfig.RecognizedKustomizationFileNames() {
fmt.Println(d.ID(), "/", suffix)
i, ok := c.lukp[d.ID()+"/"+suffix]
savedFilePath := d.FilePath
d.FilePath += "/" + suffix
i, ok := c.lukp[d.ID()]
if !ok {
d.FilePath = savedFilePath
continue
}
d.FilePath += "/" + suffix
d.DocumentData = c.docs[i].DocumentData
return nil
}
@@ -106,8 +107,8 @@ func (s sortableDocs) Len() int {
return len(s)
}
func TestCrawlerRunner(t *testing.T) {
fmt.Println("testing CRunner")
func TestCrawlGithubRunner(t *testing.T) {
fmt.Println("testing CrawlGithubRunner")
tests := []struct {
tc []Crawler
errs []error
@@ -178,7 +179,7 @@ func TestCrawlerRunner(t *testing.T) {
defer close(output)
defer wg.Done()
errs := CRunner(context.Background(),
errs := CrawlGithubRunner(context.Background(),
output, test.tc)
// Check that errors are returned as they should be.
@@ -302,29 +303,6 @@ resources:
RepositoryURL: kustomizeRepo,
FilePath: "examples/seedcrawl2/job.yaml",
}},
// Visited from the crawler runner.
{Document: doc.Document{
RepositoryURL: kustomizeRepo,
FilePath: "examples/other/base/kustomization.yaml",
DocumentData: `
resources:
- ../app
`,
}},
// Visited from the crawler runner.
{Document: doc.Document{
RepositoryURL: kustomizeRepo,
FilePath: "examples/other/app/kustomization.yaml",
DocumentData: `
resources:
- resource.yaml
`,
}},
// Visited from crawling runner imported as resource.
{Document: doc.Document{
RepositoryURL: kustomizeRepo,
FilePath: "examples/other/app/resource.yaml",
}},
},
},
}
@@ -342,6 +320,7 @@ resources:
visited[d.ID()]++
return nil
},
make(map[string]struct{}),
)
if lv, lc := len(visited), len(tc.corpus); lv != lc {
t.Errorf("error: %d of %d documents visited.", lv, lc)