Fix insert/updating document logic

This commit is contained in:
Haiyan Meng
2019-12-17 12:14:41 -08:00
parent 2c2aa928cc
commit bef157d6b3
3 changed files with 57 additions and 42 deletions

View File

@@ -65,7 +65,7 @@ func main() {
} }
// Index updates the value in the index. // Index updates the value in the index.
index := func(cdoc crawler.CrawledDocument, crwlr crawler.Crawler, mode index.Mode) error { indexFunc := func(cdoc crawler.CrawledDocument, crwlr crawler.Crawler, mode index.Mode) error {
switch d := cdoc.(type) { switch d := cdoc.(type) {
case *doc.KustomizationDocument: case *doc.KustomizationDocument:
switch mode { switch mode {
@@ -74,8 +74,7 @@ func main() {
return idx.Delete(d.ID()) return idx.Delete(d.ID())
default: default:
fmt.Println("Inserting: ", d) fmt.Println("Inserting: ", d)
_, err := idx.Put(d.ID(), d) return idx.Put(d.ID(), d)
return err
} }
default: default:
return fmt.Errorf("type %T not supported", d) return fmt.Errorf("type %T not supported", d)
@@ -123,6 +122,6 @@ func main() {
} }
crawlers := []crawler.Crawler{ghCrawler} crawlers := []crawler.Crawler{ghCrawler}
crawler.CrawlFromSeed(ctx, seedDocs, crawlers, docConverter, index, seen) crawler.CrawlFromSeed(ctx, seedDocs, crawlers, docConverter, indexFunc, seen)
crawler.CrawlGithub(ctx, crawlers, docConverter, index, seen) crawler.CrawlGithub(ctx, crawlers, docConverter, indexFunc, seen)
} }

View File

@@ -6,7 +6,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"time" "time"
es "github.com/elastic/go-elasticsearch/v6" es "github.com/elastic/go-elasticsearch/v6"
@@ -179,47 +178,47 @@ func (idx *index) DeleteIndex() error {
} }
// Insert or update the document by ID. // Insert or update the document by ID.
func (idx *index) Put(uniqueID string, doc interface{}) (string, error) { func (idx *index) Put(uniqueID string, doc interface{}) error {
docBytes, err := json.Marshal(doc) exists, err := idx.Exists(uniqueID)
if err != nil { if err != nil {
return "", err return err
} }
body := byteJoin(`{"doc":`, docBytes, `}`)
// Use `UpdateRequest` here instead of `IndexRequest`. if exists {
// For a document with a given id, every call of IndexRequest.Do will increase the version of a document. docBytes, err := json.Marshal(doc)
req := esapi.UpdateRequest{ if err != nil {
Index: idx.name, return err
Body: bytes.NewReader(body),
DocumentID: uniqueID,
}
res, err := req.Do(idx.ctx, idx.client)
var id string
readId := func(reader io.Reader) error {
type InsertResult struct {
ID string `json:"_id,omitempty"`
} }
var ir InsertResult body := byteJoin(`{"doc":`, docBytes, `}`)
data, err := ioutil.ReadAll(reader)
// For a document with a given id, every call of IndexRequest.Do will increase the version of a document.
// To avoid increasing the document version unnecessarily, use UpdateRequest here.
req := esapi.UpdateRequest{
Index: idx.name,
Body: bytes.NewReader(body),
DocumentID: uniqueID,
}
res, err := req.Do(idx.ctx, idx.client)
err = idx.responseErrorOrNil("could not update document",
res, err, ignoreResponseBody)
} else {
body, err := json.Marshal(doc)
if err != nil { if err != nil {
return err return err
} }
err = json.Unmarshal(data, &ir) req := esapi.IndexRequest{
if err != nil { Index: idx.name,
return err Body: bytes.NewReader(body),
DocumentID: uniqueID,
} }
id = ir.ID res, err := req.Do(idx.ctx, idx.client)
return nil err = idx.responseErrorOrNil("could not insert document",
res, err, ignoreResponseBody)
} }
return err
// populates the id field.
err = idx.responseErrorOrNil("could not insert document",
res, err, readId)
return id, err
} }
type scrollUpdater func(string, readerFunc) error type scrollUpdater func(string, readerFunc) error
@@ -299,3 +298,24 @@ func (idx *index) Delete(id string) error {
fmt.Sprintf("could not delete id(%s) from index(%s)", id, idx.name), fmt.Sprintf("could not delete id(%s) from index(%s)", id, idx.name),
res, err, ignoreResponseBody) res, err, ignoreResponseBody)
} }
// Check whether a given document id is in the index
func (idx *index) Exists(id string) (bool, error) {
op := idx.client.Exists
res, err := op(
idx.name,
id,
op.WithContext(idx.ctx),
op.WithPretty(),
)
if !res.IsError() {
return true, nil
} else if res.StatusCode == 404 {
return false, nil
} else {
return false, idx.responseErrorOrNil(
fmt.Sprintf("could not check the existence of id(%s) from index(%s)", id, idx.name),
res, err, ignoreResponseBody)
}
}

View File

@@ -299,12 +299,8 @@ func (ki *KustomizeIndex) IterateQuery(query []byte, batchSize int,
} }
// type specific Put for inserting structured kustomization documents. // type specific Put for inserting structured kustomization documents.
func (ki *KustomizeIndex) Put(id string, doc *doc.KustomizationDocument) (string, error) { func (ki *KustomizeIndex) Put(id string, doc *doc.KustomizationDocument) error {
id, err := ki.index.Put(id, doc) return ki.index.Put(id, doc)
if err != nil {
return id, fmt.Errorf("could not insert in elastic: %v", err)
}
return id, nil
} }
// Delete a document with a given id from the kustomize index. // Delete a document with a given id from the kustomize index.