From ca41674df318191f1ac4e044811bad2040beff0e Mon Sep 17 00:00:00 2001 From: Damien Robichaud Date: Tue, 9 Jul 2019 13:04:10 -0700 Subject: [PATCH] Implementation of basic crawler organisation. `crawler.Crawler` interface is defined, where a crawler has to implement a `Crawl` method that forwards document found by the crawler to a channel. A helper function that launches a list of crawlers concurrently and merges their channels into one main output channel, forwarding errors is also implemented. Finally, a test that verifies the correctness and concurrency of the helper method is provided. --- internal/search/crawler/crawler.go | 76 +++++++++++++++ internal/search/crawler/crawler_test.go | 124 ++++++++++++++++++++++++ 2 files changed, 200 insertions(+) create mode 100644 internal/search/crawler/crawler.go create mode 100644 internal/search/crawler/crawler_test.go diff --git a/internal/search/crawler/crawler.go b/internal/search/crawler/crawler.go new file mode 100644 index 000000000..499da8dbe --- /dev/null +++ b/internal/search/crawler/crawler.go @@ -0,0 +1,76 @@ +// Package crawler provides helper methods and defines an interface for lauching +// source repository crawlers that retrieve files from a source and forwards +// to a channel for indexing and retrieval. +package crawler + +import ( + "context" + "fmt" + "sync" + + "sigs.k8s.io/kustomize/internal/search/doc" +) + +// Crawler forwards documents from source repositories to index and store them +// for searching. Each crawler is responsible for querying it's source of +// information, and forwarding files that have not been seen before or that need +// updating. +type Crawler interface { + // Crawl returns when it is done processing. This method does not take + // ownership of the channel. The channel is write only, and it + // designates where the crawler should forward the documents. + Crawl(ctx context.Context, output chan<- *doc.KustomizationDocument) error +} + +// CrawlerRunner is a blocking function and only returns once all of the +// crawlers are finished with execution. +// +// This function uses the output channel to forward kustomization documents +// from a list of crawlers. The output is to be consumed by a database/search +// indexer for later retrieval. +// +// The return value is an array of errors in which each index represents the +// index of the crawler that emitted the error. Although the errors themselves +// can be nil, the array will always be exactly the size of the crawlers array. +func CrawlerRunner(ctx context.Context, + output chan<- *doc.KustomizationDocument, crawlers []Crawler) []error { + + errs := make([]error, len(crawlers)) + wg := sync.WaitGroup{} + + for i, crawler := range crawlers { + // Crawler implementations get their own channels to prevent a + // crawler from closing the main output channel. + docs := make(chan *doc.KustomizationDocument) + wg.Add(2) + + // Forward all of the documents from this crawler's channel to + // the main output channel. + go func(docs <-chan *doc.KustomizationDocument) { + defer wg.Done() + for doc := range docs { + output <- doc + } + }(docs) + + // Run this crawler and capture its returned error. + go func(idx int, crawler Crawler, + docs chan<- *doc.KustomizationDocument) { + + defer func() { + wg.Done() + if r := recover(); r != nil { + errs[idx] = fmt.Errorf( + "%+v panicked: %v, additional error %v", + crawler, r, errs[idx], + ) + } + }() + defer close(docs) + errs[idx] = crawler.Crawl(ctx, docs) + }(i, crawler, docs) // Copies the index and the crawler + } + + wg.Wait() + return errs +} diff --git a/internal/search/crawler/crawler_test.go b/internal/search/crawler/crawler_test.go new file mode 100644 index 000000000..b8a6af86d --- /dev/null +++ b/internal/search/crawler/crawler_test.go @@ -0,0 +1,124 @@ +package crawler + +import ( + "context" + "errors" + "reflect" + "sort" + "sync" + "testing" + + "sigs.k8s.io/kustomize/internal/search/doc" +) + +// Simple crawler that forwards it's list of documents to a provided channel and +// returns it's error to the caller. +type testCrawler struct { + docs []doc.KustomizationDocument + err error +} + +// Crawl implements the Crawler interface for testing. +func (c testCrawler) Crawl(ctx context.Context, + output chan<- *doc.KustomizationDocument) error { + + for i := range c.docs { + output <- &c.docs[i] + } + return c.err +} + +// Used to make sure that we're comparing documents in order. This is needed +// since these documents will be sent concurrently. +type sortableDocs []doc.KustomizationDocument + +func (s sortableDocs) Less(i, j int) bool { + return s[i].FilePath < s[j].FilePath +} + +func (s sortableDocs) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +func (s sortableDocs) Len() int { + return len(s) +} + +func TestCrawlerRunner(t *testing.T) { + tests := []struct { + tc []Crawler + errs []error + docs sortableDocs + }{ + { + tc: []Crawler{ + testCrawler{ + docs: []doc.KustomizationDocument{ + {FilePath: "crawler1/doc1"}, + {FilePath: "crawler1/doc2"}, + {FilePath: "crawler1/doc3"}, + }, + }, + testCrawler{err: errors.New("crawler2")}, + testCrawler{}, + testCrawler{ + docs: []doc.KustomizationDocument{ + {FilePath: "crawler4/doc1"}, + {FilePath: "crawler4/doc2"}, + }, + err: errors.New("crawler4"), + }, + }, + errs: []error{ + nil, + errors.New("crawler2"), + nil, + errors.New("crawler4"), + }, + docs: sortableDocs{ + {FilePath: "crawler1/doc1"}, + {FilePath: "crawler1/doc2"}, + {FilePath: "crawler1/doc3"}, + {FilePath: "crawler4/doc1"}, + {FilePath: "crawler4/doc2"}, + }, + }, + } + + for _, test := range tests { + output := make(chan *doc.KustomizationDocument) + wg := sync.WaitGroup{} + wg.Add(1) + + // Run the Crawler runner with a list of crawlers. + go func() { + defer close(output) + defer wg.Done() + + errs := CrawlerRunner(context.Background(), output, + test.tc) + + // Check that errors are returned as they should be. + if !reflect.DeepEqual(errs, test.errs) { + t.Errorf("Expected errs (%v) to equal (%v)", + errs, test.errs) + } + + }() + + // Iterate over the output channel of Crawler runner. + returned := make(sortableDocs, 0, len(test.docs)) + for doc := range output { + returned = append(returned, *doc) + } + + // Check that all documents are received. + sort.Sort(returned) + if !reflect.DeepEqual(returned, test.docs) { + t.Errorf("Expected docs (%v) to equal (%v)\n", + returned, test.docs) + } + + wg.Wait() + } +}