mirror of
https://github.com/kubernetes-sigs/kustomize.git
synced 2026-06-10 08:20:59 +00:00
Implementation of basic crawler organisation.
`crawler.Crawler` interface is defined, where a crawler has to implement a `Crawl` method that forwards document found by the crawler to a channel. A helper function that launches a list of crawlers concurrently and merges their channels into one main output channel, forwarding errors is also implemented. Finally, a test that verifies the correctness and concurrency of the helper method is provided.
This commit is contained in:
76
internal/search/crawler/crawler.go
Normal file
76
internal/search/crawler/crawler.go
Normal file
@@ -0,0 +1,76 @@
|
||||
// Package crawler provides helper methods and defines an interface for lauching
|
||||
// source repository crawlers that retrieve files from a source and forwards
|
||||
// to a channel for indexing and retrieval.
|
||||
package crawler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"sigs.k8s.io/kustomize/internal/search/doc"
|
||||
)
|
||||
|
||||
// Crawler forwards documents from source repositories to index and store them
|
||||
// for searching. Each crawler is responsible for querying it's source of
|
||||
// information, and forwarding files that have not been seen before or that need
|
||||
// updating.
|
||||
type Crawler interface {
|
||||
// Crawl returns when it is done processing. This method does not take
|
||||
// ownership of the channel. The channel is write only, and it
|
||||
// designates where the crawler should forward the documents.
|
||||
Crawl(ctx context.Context, output chan<- *doc.KustomizationDocument) error
|
||||
}
|
||||
|
||||
// CrawlerRunner is a blocking function and only returns once all of the
|
||||
// crawlers are finished with execution.
|
||||
//
|
||||
// This function uses the output channel to forward kustomization documents
|
||||
// from a list of crawlers. The output is to be consumed by a database/search
|
||||
// indexer for later retrieval.
|
||||
//
|
||||
// The return value is an array of errors in which each index represents the
|
||||
// index of the crawler that emitted the error. Although the errors themselves
|
||||
// can be nil, the array will always be exactly the size of the crawlers array.
|
||||
func CrawlerRunner(ctx context.Context,
|
||||
output chan<- *doc.KustomizationDocument, crawlers []Crawler) []error {
|
||||
|
||||
errs := make([]error, len(crawlers))
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
for i, crawler := range crawlers {
|
||||
// Crawler implementations get their own channels to prevent a
|
||||
// crawler from closing the main output channel.
|
||||
docs := make(chan *doc.KustomizationDocument)
|
||||
wg.Add(2)
|
||||
|
||||
// Forward all of the documents from this crawler's channel to
|
||||
// the main output channel.
|
||||
go func(docs <-chan *doc.KustomizationDocument) {
|
||||
defer wg.Done()
|
||||
for doc := range docs {
|
||||
output <- doc
|
||||
}
|
||||
}(docs)
|
||||
|
||||
// Run this crawler and capture its returned error.
|
||||
go func(idx int, crawler Crawler,
|
||||
docs chan<- *doc.KustomizationDocument) {
|
||||
|
||||
defer func() {
|
||||
wg.Done()
|
||||
if r := recover(); r != nil {
|
||||
errs[idx] = fmt.Errorf(
|
||||
"%+v panicked: %v, additional error %v",
|
||||
crawler, r, errs[idx],
|
||||
)
|
||||
}
|
||||
}()
|
||||
defer close(docs)
|
||||
errs[idx] = crawler.Crawl(ctx, docs)
|
||||
}(i, crawler, docs) // Copies the index and the crawler
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
return errs
|
||||
}
|
||||
124
internal/search/crawler/crawler_test.go
Normal file
124
internal/search/crawler/crawler_test.go
Normal file
@@ -0,0 +1,124 @@
|
||||
package crawler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"reflect"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"sigs.k8s.io/kustomize/internal/search/doc"
|
||||
)
|
||||
|
||||
// Simple crawler that forwards it's list of documents to a provided channel and
|
||||
// returns it's error to the caller.
|
||||
type testCrawler struct {
|
||||
docs []doc.KustomizationDocument
|
||||
err error
|
||||
}
|
||||
|
||||
// Crawl implements the Crawler interface for testing.
|
||||
func (c testCrawler) Crawl(ctx context.Context,
|
||||
output chan<- *doc.KustomizationDocument) error {
|
||||
|
||||
for i := range c.docs {
|
||||
output <- &c.docs[i]
|
||||
}
|
||||
return c.err
|
||||
}
|
||||
|
||||
// Used to make sure that we're comparing documents in order. This is needed
|
||||
// since these documents will be sent concurrently.
|
||||
type sortableDocs []doc.KustomizationDocument
|
||||
|
||||
func (s sortableDocs) Less(i, j int) bool {
|
||||
return s[i].FilePath < s[j].FilePath
|
||||
}
|
||||
|
||||
func (s sortableDocs) Swap(i, j int) {
|
||||
s[i], s[j] = s[j], s[i]
|
||||
}
|
||||
|
||||
func (s sortableDocs) Len() int {
|
||||
return len(s)
|
||||
}
|
||||
|
||||
func TestCrawlerRunner(t *testing.T) {
|
||||
tests := []struct {
|
||||
tc []Crawler
|
||||
errs []error
|
||||
docs sortableDocs
|
||||
}{
|
||||
{
|
||||
tc: []Crawler{
|
||||
testCrawler{
|
||||
docs: []doc.KustomizationDocument{
|
||||
{FilePath: "crawler1/doc1"},
|
||||
{FilePath: "crawler1/doc2"},
|
||||
{FilePath: "crawler1/doc3"},
|
||||
},
|
||||
},
|
||||
testCrawler{err: errors.New("crawler2")},
|
||||
testCrawler{},
|
||||
testCrawler{
|
||||
docs: []doc.KustomizationDocument{
|
||||
{FilePath: "crawler4/doc1"},
|
||||
{FilePath: "crawler4/doc2"},
|
||||
},
|
||||
err: errors.New("crawler4"),
|
||||
},
|
||||
},
|
||||
errs: []error{
|
||||
nil,
|
||||
errors.New("crawler2"),
|
||||
nil,
|
||||
errors.New("crawler4"),
|
||||
},
|
||||
docs: sortableDocs{
|
||||
{FilePath: "crawler1/doc1"},
|
||||
{FilePath: "crawler1/doc2"},
|
||||
{FilePath: "crawler1/doc3"},
|
||||
{FilePath: "crawler4/doc1"},
|
||||
{FilePath: "crawler4/doc2"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
output := make(chan *doc.KustomizationDocument)
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
|
||||
// Run the Crawler runner with a list of crawlers.
|
||||
go func() {
|
||||
defer close(output)
|
||||
defer wg.Done()
|
||||
|
||||
errs := CrawlerRunner(context.Background(), output,
|
||||
test.tc)
|
||||
|
||||
// Check that errors are returned as they should be.
|
||||
if !reflect.DeepEqual(errs, test.errs) {
|
||||
t.Errorf("Expected errs (%v) to equal (%v)",
|
||||
errs, test.errs)
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
// Iterate over the output channel of Crawler runner.
|
||||
returned := make(sortableDocs, 0, len(test.docs))
|
||||
for doc := range output {
|
||||
returned = append(returned, *doc)
|
||||
}
|
||||
|
||||
// Check that all documents are received.
|
||||
sort.Sort(returned)
|
||||
if !reflect.DeepEqual(returned, test.docs) {
|
||||
t.Errorf("Expected docs (%v) to equal (%v)\n",
|
||||
returned, test.docs)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user