diff --git a/internal/tools/index/elasticsearch.go b/internal/tools/index/elasticsearch.go new file mode 100644 index 000000000..1e75c86bb --- /dev/null +++ b/internal/tools/index/elasticsearch.go @@ -0,0 +1,266 @@ +package index + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "time" + + es "github.com/elastic/go-elasticsearch/v6" + "github.com/elastic/go-elasticsearch/v6/esapi" +) + +// TODO(damienr74) Split index into reader and writer? +type index struct { + ctx context.Context + client *es.Client + name string +} + +func newIndex(ctx context.Context, name string) (*index, error) { + client, err := es.NewDefaultClient() + if err != nil { + return nil, err + } + + return &index{ + ctx: ctx, + client: client, + name: name, + }, nil +} + +type readerFunc func(io.Reader) error + +func ignoreResponseBody(reader io.Reader) error { + return nil +} + +// checks that elastic returned successfully. If it has not, it will read the +// body and return it in an error message. +// +// Otherwise, it will use the readerFunc to read the body. This function is a +// mechanism for getting relevant data from the response only if it was successful. +func (idx *index) responseErrorOrNil(info string, res *esapi.Response, + err error, reader readerFunc) error { + + messageStart := fmt.Sprintf("index %s error: %s", idx.name, info) + if err != nil || res == nil { + return fmt.Errorf("%s: %v", messageStart, err) + } + + defer res.Body.Close() + if res.IsError() { + return fmt.Errorf("%s: %s", messageStart, res.String()) + } + + if reader != nil { + err = reader(res.Body) + if err != nil { + return fmt.Errorf("%s: %v", messageStart, err) + } + } + + return nil +} + +func byteJoin(bts ...interface{}) []byte { + ret := make([][]byte, len(bts)) + for i, v := range bts { + switch bt := v.(type) { + case []byte: + ret[i] = bt + case string: + ret[i] = []byte(bt) + default: + ret[i] = []byte(fmt.Sprintf("%v", bt)) + } + } + + return bytes.Join(ret, []byte(` `)) +} + +// Update the elasticsearch index mappings. (describes how to index/search for the documents). +func (idx *index) UpdateMapping(mappings []byte) error { + request := byteJoin(`{ "mappings":`, mappings, `}`) + + op := idx.client.Indices.PutMapping + res, err := op( + bytes.NewReader(request), + op.WithContext(idx.ctx), + op.WithIndex(idx.name), + op.WithIncludeTypeName(true), + op.WithPretty(), + ) + + return idx.responseErrorOrNil( + fmt.Sprintf("could not update index mappings '%s'", request), + res, err, ignoreResponseBody) +} + +// Update the elasticsearch index settings. (describes default parameters and +// some analyzer definitions, etc.) +func (idx *index) UpdateSetting(settings []byte) error { + request := byteJoin(`{ "settings": `, settings, `}`) + op := idx.client.Indices.PutSettings + res, err := op( + bytes.NewReader(request), + op.WithContext(idx.ctx), + op.WithIndex(idx.name), + op.WithPretty(), + ) + + return idx.responseErrorOrNil( + fmt.Sprintf("could not update index settings '%s'", request), + res, err, ignoreResponseBody) +} + +// Create an index providing both the mappings and the settings. +func (idx *index) CreateIndex(mappings []byte, settings []byte) error { + request := byteJoin(`{ "mappings":`, mappings, `, "settings":`, settings, `}`) + op := idx.client.Indices.Create + res, err := op( + idx.name, + op.WithBody(bytes.NewReader(request)), + op.WithContext(idx.ctx), + op.WithHuman(), + op.WithPretty(), + op.WithIncludeTypeName(true), + ) + + return idx.responseErrorOrNil( + fmt.Sprintf("could not create index with config '%s'", request), + res, err, ignoreResponseBody) +} + +// Delete an index. +func (idx *index) DeleteIndex() error { + res, err := idx.client.Indices.Delete( + []string{idx.name}, + ) + + return idx.responseErrorOrNil("could not delete index", + res, err, ignoreResponseBody) +} + +// Insert or update the document by ID. +func (idx *index) Put(uniqueID string, doc interface{}) (string, error) { + body, err := json.Marshal(doc) + if err != nil { + return "", err + } + + req := esapi.IndexRequest{ + Index: idx.name, + Body: bytes.NewReader(body), + DocumentID: uniqueID, + } + res, err := req.Do(idx.ctx, idx.client) + + var id string + readId := func(reader io.Reader) error { + type InsertResult struct { + ID string `json:"_id,omitempty"` + } + var ir InsertResult + data, err := ioutil.ReadAll(reader) + if err != nil { + return err + } + + err = json.Unmarshal(data, &ir) + if err != nil { + return err + } + id = ir.ID + + return nil + } + + // populates the id field. + err = idx.responseErrorOrNil("could not insert document", + res, err, readId) + + return id, err +} + +type scrollUpdater func(string, readerFunc) error + +// Update the scroll for iteration. If no scroll exists, create one. +func (idx *index) scrollUpdater(query []byte, batchSize int, + timeout time.Duration) scrollUpdater { + + return func(scrollID string, reader readerFunc) error { + var res *esapi.Response + var err error + + if scrollID == "" { + search := idx.client.Search + res, err = search( + search.WithContext(idx.ctx), + search.WithIndex(idx.name), + search.WithBody(bytes.NewBuffer(query)), + search.WithScroll(timeout), + search.WithSize(batchSize), + ) + } else { + scroll := idx.client.Scroll + res, err = scroll( + scroll.WithContext(idx.ctx), + scroll.WithScroll(timeout), + scroll.WithScrollID(scrollID), + ) + } + + return idx.responseErrorOrNil( + fmt.Sprintf("could not scroll for query %s", query), + res, err, reader) + } +} + +// Simple search options. Size is the number of elements to return, From is the +// rank of the results according to the query. Used as a simple (stateless) +// pagination technique. +type SearchOptions struct { + Size int + From int +} + +// Search for a query (json query dsl) with some options, and use the reader func +// to extract the response. +func (idx *index) Search(query []byte, opts SearchOptions, + responseReader readerFunc) error { + + op := idx.client.Search + res, err := op( + op.WithContext(idx.ctx), + op.WithIndex(idx.name), + op.WithBody(bytes.NewBuffer(query)), + op.WithTrackTotalHits(true), + op.WithSize(opts.Size), + op.WithFrom(opts.From), + op.WithPretty(), + ) + + return idx.responseErrorOrNil( + fmt.Sprintf("could not complete search query %v", query), + res, err, responseReader) +} + +// Delete an element from elasticsearch by Id. +func (idx *index) Delete(id string) error { + op := idx.client.Delete + res, err := op( + idx.name, + id, + op.WithContext(idx.ctx), + op.WithPretty(), + ) + + return idx.responseErrorOrNil( + fmt.Sprintf("could not delete id(%s) from index(%s)", id, idx.name), + res, err, ignoreResponseBody) +} diff --git a/internal/tools/index/kustomize.go b/internal/tools/index/kustomize.go new file mode 100644 index 000000000..d82674003 --- /dev/null +++ b/internal/tools/index/kustomize.go @@ -0,0 +1,337 @@ +package index + +import ( + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "strings" + "time" + + "sigs.k8s.io/kustomize/internal/tools/doc" +) + +const ( + AggregationKeyword = "aggs" +) + +// Redefinition of Hits structure. Must match the json string of +// KustomizeResult.Hits.Hits. Declared as a convenience for iteration. +type KustomizeHits []struct { + ID string `json:"id"` + Document doc.KustomizationDocument `json:"result"` +} + +type KustomizeResult struct { + ScrollID *string `json:"-"` + + Hits *struct { + Total int `json:"total"` + Hits []struct { + ID string `json:"id"` + Document doc.KustomizationDocument `json:"result"` + } `json:"hits"` + } `json:"hits,omitempty"` + + Aggregations *struct { + Timeseries *struct { + Buckets []struct { + Key string `json:"key"` + Count int `json:"count"` + } `json:"buckets"` + } `json:"timeseries,omitempty"` + + Kinds *struct { + OtherCount int `json:"otherResults"` + Buckets []struct { + Key string `json:"key"` + Count int `json:"count"` + } `json:"buckets"` + } `json:"kinds,omitempty"` + } `json:"aggregations,omitempty"` +} + +// Elasticsearch has some sometimes inconsistent labels, and some pretty ugly label choices. +// However, the structure seems reasonable, so I wanted to use it if possible. This method +// needs two copies of the types to make the json strings different. The Copies must be the +// exact same type/structure, so the types must be declared inline. Go will check that these +// are convertible at compile time, and converting at runtime is a noop. +type ElasticKustomizeResult struct { + ScrollID *string `json:"_scroll_id,omitempty"` + + Hits *struct { + Total int `json:"total"` + Hits []struct { + ID string `json:"_id"` + Document doc.KustomizationDocument `json:"_source"` + } `json:"hits"` + } `json:"hits,omitempty"` + + Aggregations *struct { + Timeseries *struct { + Buckets []struct { + Key string `json:"key_as_string"` + Count int `json:"doc_count"` + } + } `json:"timeseries,omitempty"` + + Kinds *struct { + OtherCount int `json:"sum_other_doc_count"` + Buckets []struct { + Key string `json:"key"` + Count int `json:"doc_count"` + } + } `json:"kinds,omitempty"` + } `json:"aggregations,omitempty"` +} + +type KustomizeIndex struct { + *index +} + +// Create index reference to the index containing the kustomize documents. +func NewKustomizeIndex(ctx context.Context) (*KustomizeIndex, error) { + idx, err := newIndex(ctx, "kustomize") + if err != nil { + return nil, err + } + return &KustomizeIndex{idx}, nil +} + +// Return a timeseries of kustomization file counts. +func TimeseriesAggregation() (string, map[string]interface{}) { + return "timeseries", map[string]interface{}{ + "date_histogram": map[string]interface{}{ + "field": "creationTime", + "interval": "day", + /// XXX Only return values with counts, otherwise + // every day is added to the output... + // This matters if ever a zero valued time would + // be stored in the creationTime field... it would + // return >600k entries (for every day since year 0). + // IDK why this is default, but I would not want this + // to happen... + "min_doc_count": 1, + }, + } +} + +// Return aggregation of results based off of their kinds. +func KindAggregation(maxBuckets int) (string, map[string]interface{}) { + if maxBuckets < 1 { + maxBuckets = 1 + } + return "kinds", map[string]interface{}{ + "terms": map[string]interface{}{ + "field": "kinds.keyword", + "size": maxBuckets, + }, + } +} + +// The multi_match search type in elasticsearch will check each field according +// to their respective analyzers for the identifier. +func multiMatch(query string) map[string]interface{} { + return map[string]interface{}{ + "multi_match": map[string]interface{}{ + "type": "cross_fields", + "fields": []string{ + "values.keyword^3", + "identifiers.keyword^3", + "values.ngram", + "identifiers.ngram", + // TODO(damienr74) remove document with default + // analyzer. It does not handle special (=,: etc) + // characters properly, and matches with false + // positives. document.whitespace does not exist + // yet, but should use the whitespace analyzer. + "document", + "document.whitespace", + }, + "query": query, + }, + } +} + +// Build an elasticsearch query from a user query. +func BuildQuery(query string) map[string]interface{} { + queryTokens := strings.Fields(query) + if len(queryTokens) == 0 { + return map[string]interface{}{ + "size": 0, + } + } + + mustMatch := make([]map[string]interface{}, len(queryTokens)) + + for i, tok := range queryTokens { + if strings.HasPrefix(strings.ToLower(tok), "kind=") { + mustMatch[i] = map[string]interface{}{ + "term": map[string]interface{}{ + "kinds.keyword": tok[5:], + }, + } + continue + } + mustMatch[i] = multiMatch(tok) + } + + structuredQuery := map[string]interface{}{ + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + "must": mustMatch, + }, + }, + } + + return structuredQuery +} + +// Iterator based off of the way bufio.Scanner works. +// +// Example: +// for it.Next() { +// for _, doc := range it.Value().Hits { +// // Handle KustomizationDocument. +// } +// } +// +// if err := it.Err(); err != nil { +// // Handle err. +// } +type KustomizeIterator struct { + update scrollUpdater + err error + // Matches the return definition of elasticsearch search results. The + // scroll ID is practically a database cursor. + scrollImpl KustomizeResult +} + +// Get the next batch of results. Note that this returns multiple results that +// can be iterated. +func (it *KustomizeIterator) Next() bool { + reader := func(reader io.Reader) error { + data, err := ioutil.ReadAll(reader) + if err != nil { + return fmt.Errorf("could not read from body: %v", err) + } + var scrollInput ElasticKustomizeResult + err = json.Unmarshal(data, &scrollInput) + if err != nil { + return fmt.Errorf("cloud not marshal %s into %T: %v", + data, scrollInput, err) + } + it.scrollImpl = KustomizeResult(scrollInput) + + return nil + } + + if it.err == nil { + fmt.Printf("updating scroll: %s\n", *it.scrollImpl.ScrollID) + it.err = it.update(*it.scrollImpl.ScrollID, reader) + } + + // if there is no error and the array is not empty, then Value is + // obligated to return a valid result. + return it.err == nil && + it.scrollImpl.Hits != nil && + len(it.scrollImpl.Hits.Hits) > 0 +} + +// Get the value from this batch of iterations. +func (it *KustomizeIterator) Value() KustomizeResult { + return it.scrollImpl +} + +// Check if any errors have occured. +func (it *KustomizeIterator) Err() error { + return it.err +} + +// Create an iterator over query. Iterate in chunks of batchSize, each batch +// should take no longer than timeout to read (otherwise, elasticsearch will +// delete the context). +// +// XXX Important to set a reasonable amount of time to read the documents. If +// a lot of processing must be done, consider loading everything in memory +// before doing it so that, a short timeout period can be set. Scrolling creates +// a consistent DB context, so this can be costly. +// +// Scrolling is also not meant to be used for real time purposes. If you need +// results quickly, consider using the From: field in SearchOptions and a normal +// search. This will not guarantee that the values will not change but is more +// suitable for lower latencies/long execution timeouts. +func (ki *KustomizeIndex) IterateQuery(query []byte, batchSize int, + timeout time.Duration) *KustomizeIterator { + + emptyScroll := "" + return &KustomizeIterator{ + update: ki.scrollUpdater(query, batchSize, timeout), + scrollImpl: KustomizeResult{ + ScrollID: &emptyScroll, + }, + } +} + +// type specific Put for inserting structured kustomization documents. +func (ki *KustomizeIndex) Put(id string, doc *doc.KustomizationDocument) (string, error) { + id, err := ki.index.Put(id, doc) + if err != nil { + return id, fmt.Errorf("could not insert in elastic: %v", err) + } + return id, nil +} + +// Kustomize search options: What metrics should be returned? Kind Aggregation, +// TimeseriesAggregation, etc. Also embedds the SearchOptions field to specify +// the position in the sorted list of results and the number of results to return. +type KustomizeSearchOptions struct { + SearchOptions + KindAggregation bool + TimeseriesAggregation bool +} + +// Search the index with the given query string. Returns a structured result and possible +// aggregates. +func (ki *KustomizeIndex) Search(query string, + opts KustomizeSearchOptions) (*KustomizeResult, error) { + + aggMap := make(map[string]interface{}) + if opts.KindAggregation { + k, kAgg := KindAggregation(15) + aggMap[k] = kAgg + } + if opts.TimeseriesAggregation { + t, tAgg := TimeseriesAggregation() + aggMap[t] = tAgg + } + + esQuery := BuildQuery(query) + if len(aggMap) > 0 { + esQuery[AggregationKeyword] = aggMap + } + + data, err := json.Marshal(&esQuery) + if err != nil { + return nil, fmt.Errorf("failed to format query %s", query) + } + fmt.Printf("formated query: %s\n", data) + + var kr ElasticKustomizeResult + err = ki.index.Search(data, opts.SearchOptions, func(results io.Reader) error { + data, err = ioutil.ReadAll(results) + if err != nil { + return fmt.Errorf("could not read results from search: %v", err) + } + + if err = json.Unmarshal(data, &kr); err != nil { + return fmt.Errorf("could not parse results from search: %v", err) + } + + return nil + }) + res := KustomizeResult(kr) + + return &res, err +} diff --git a/internal/tools/index/kustomize_test.go b/internal/tools/index/kustomize_test.go new file mode 100644 index 000000000..6ad4cc58e --- /dev/null +++ b/internal/tools/index/kustomize_test.go @@ -0,0 +1,72 @@ +package index + +import ( + "reflect" + "testing" +) + +func TestBuildQuery(t *testing.T) { + testCases := []struct { + query string + result map[string]interface{} + }{ + { + query: " \t\n\r", + result: map[string]interface{}{"size": 0}, + }, + { + query: "\tidentifier1 identifier2\nidentifier3\r", + result: map[string]interface{}{ + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + "must": []map[string]interface{}{ + multiMatch("identifier1"), + multiMatch("identifier2"), + multiMatch("identifier3"), + }, + }, + }, + }, + }, + { + query: "kind=Kustomization", + result: map[string]interface{}{ + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + "must": []map[string]interface{}{ + { + "term": map[string]interface{}{ + "kinds.keyword": "Kustomization", + }, + }, + }, + }, + }, + }, + }, + { + query: "kind=Kustomization identifier2", + result: map[string]interface{}{ + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + "must": []map[string]interface{}{ + { + "term": map[string]interface{}{ + "kinds.keyword": "Kustomization", + }, + }, + multiMatch("identifier2"), + }, + }, + }, + }, + }, + } + + for _, tc := range testCases { + result := BuildQuery(tc.query) + if !reflect.DeepEqual(tc.result, result) { + t.Errorf("Expected %#v to match %#v", result, tc.result) + } + } +}