From bef157d6b38d15917b81567429f20718b995a880 Mon Sep 17 00:00:00 2001 From: Haiyan Meng Date: Tue, 17 Dec 2019 12:14:41 -0800 Subject: [PATCH] Fix insert/updating document logic --- api/internal/crawl/cmd/crawler/crawler.go | 9 ++- api/internal/crawl/index/elasticsearch.go | 82 ++++++++++++++--------- api/internal/crawl/index/kustomize.go | 8 +-- 3 files changed, 57 insertions(+), 42 deletions(-) diff --git a/api/internal/crawl/cmd/crawler/crawler.go b/api/internal/crawl/cmd/crawler/crawler.go index c4e913706..ea73efd04 100644 --- a/api/internal/crawl/cmd/crawler/crawler.go +++ b/api/internal/crawl/cmd/crawler/crawler.go @@ -65,7 +65,7 @@ func main() { } // Index updates the value in the index. - index := func(cdoc crawler.CrawledDocument, crwlr crawler.Crawler, mode index.Mode) error { + indexFunc := func(cdoc crawler.CrawledDocument, crwlr crawler.Crawler, mode index.Mode) error { switch d := cdoc.(type) { case *doc.KustomizationDocument: switch mode { @@ -74,8 +74,7 @@ func main() { return idx.Delete(d.ID()) default: fmt.Println("Inserting: ", d) - _, err := idx.Put(d.ID(), d) - return err + return idx.Put(d.ID(), d) } default: return fmt.Errorf("type %T not supported", d) @@ -123,6 +122,6 @@ func main() { } crawlers := []crawler.Crawler{ghCrawler} - crawler.CrawlFromSeed(ctx, seedDocs, crawlers, docConverter, index, seen) - crawler.CrawlGithub(ctx, crawlers, docConverter, index, seen) + crawler.CrawlFromSeed(ctx, seedDocs, crawlers, docConverter, indexFunc, seen) + crawler.CrawlGithub(ctx, crawlers, docConverter, indexFunc, seen) } diff --git a/api/internal/crawl/index/elasticsearch.go b/api/internal/crawl/index/elasticsearch.go index 2696d8dfd..b80d7f901 100644 --- a/api/internal/crawl/index/elasticsearch.go +++ b/api/internal/crawl/index/elasticsearch.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "time" es "github.com/elastic/go-elasticsearch/v6" @@ -179,47 +178,47 @@ func (idx *index) DeleteIndex() error { } // Insert or update the document by ID. -func (idx *index) Put(uniqueID string, doc interface{}) (string, error) { - docBytes, err := json.Marshal(doc) +func (idx *index) Put(uniqueID string, doc interface{}) error { + exists, err := idx.Exists(uniqueID) if err != nil { - return "", err + return err } - body := byteJoin(`{"doc":`, docBytes, `}`) - // Use `UpdateRequest` here instead of `IndexRequest`. - // For a document with a given id, every call of IndexRequest.Do will increase the version of a document. - req := esapi.UpdateRequest{ - Index: idx.name, - Body: bytes.NewReader(body), - DocumentID: uniqueID, - } - res, err := req.Do(idx.ctx, idx.client) - - var id string - readId := func(reader io.Reader) error { - type InsertResult struct { - ID string `json:"_id,omitempty"` + if exists { + docBytes, err := json.Marshal(doc) + if err != nil { + return err } - var ir InsertResult - data, err := ioutil.ReadAll(reader) + body := byteJoin(`{"doc":`, docBytes, `}`) + + // For a document with a given id, every call of IndexRequest.Do will increase the version of a document. + // To avoid increasing the document version unnecessarily, use UpdateRequest here. + req := esapi.UpdateRequest{ + Index: idx.name, + Body: bytes.NewReader(body), + DocumentID: uniqueID, + } + res, err := req.Do(idx.ctx, idx.client) + + err = idx.responseErrorOrNil("could not update document", + res, err, ignoreResponseBody) + } else { + body, err := json.Marshal(doc) if err != nil { return err } - err = json.Unmarshal(data, &ir) - if err != nil { - return err + req := esapi.IndexRequest{ + Index: idx.name, + Body: bytes.NewReader(body), + DocumentID: uniqueID, } - id = ir.ID + res, err := req.Do(idx.ctx, idx.client) - return nil + err = idx.responseErrorOrNil("could not insert document", + res, err, ignoreResponseBody) } - - // populates the id field. - err = idx.responseErrorOrNil("could not insert document", - res, err, readId) - - return id, err + return err } type scrollUpdater func(string, readerFunc) error @@ -299,3 +298,24 @@ func (idx *index) Delete(id string) error { fmt.Sprintf("could not delete id(%s) from index(%s)", id, idx.name), res, err, ignoreResponseBody) } + +// Check whether a given document id is in the index +func (idx *index) Exists(id string) (bool, error) { + op := idx.client.Exists + res, err := op( + idx.name, + id, + op.WithContext(idx.ctx), + op.WithPretty(), + ) + + if !res.IsError() { + return true, nil + } else if res.StatusCode == 404 { + return false, nil + } else { + return false, idx.responseErrorOrNil( + fmt.Sprintf("could not check the existence of id(%s) from index(%s)", id, idx.name), + res, err, ignoreResponseBody) + } +} diff --git a/api/internal/crawl/index/kustomize.go b/api/internal/crawl/index/kustomize.go index 430142fc3..cedea28bb 100644 --- a/api/internal/crawl/index/kustomize.go +++ b/api/internal/crawl/index/kustomize.go @@ -299,12 +299,8 @@ func (ki *KustomizeIndex) IterateQuery(query []byte, batchSize int, } // type specific Put for inserting structured kustomization documents. -func (ki *KustomizeIndex) Put(id string, doc *doc.KustomizationDocument) (string, error) { - id, err := ki.index.Put(id, doc) - if err != nil { - return id, fmt.Errorf("could not insert in elastic: %v", err) - } - return id, nil +func (ki *KustomizeIndex) Put(id string, doc *doc.KustomizationDocument) error { + return ki.index.Put(id, doc) } // Delete a document with a given id from the kustomize index.