Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved error handling for bulk indexing to provide more information… #219

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"log"
"time"

elastigo "github.com/mattbaird/elastigo/lib"
elastigo "github.com/zaphod-concur/elastigo/lib"
)

var (
Expand Down
10 changes: 7 additions & 3 deletions lib/baserequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,22 @@ func (c *Conn) DoCommand(method string, url string, args map[string]interface{},

httpStatusCode, body, err = req.Do(&response)
if err != nil {
return body, err
if httpStatusCode == -1 {
//connection error like *url.Error
return body, err
}
//error reading response body, or something else after we obtained an HTTP status code
return body, ESError{time.Now(), fmt.Sprintf("Error [%v] Status [%v]", err, httpStatusCode), httpStatusCode}
}
if httpStatusCode > 304 {

jsonErr := json.Unmarshal(body, &response)
if jsonErr == nil {
if res_err, ok := response["error"]; ok {
status, _ := response["status"]
return body, ESError{time.Now(), fmt.Sprintf("Error [%s] Status [%v]", res_err, status), httpStatusCode}
}
}
return body, jsonErr
return body, ESError{time.Now(), fmt.Sprintf("Error [%v] Status [%v]", jsonErr, httpStatusCode), httpStatusCode}
}
return body, nil
}
Expand Down
127 changes: 105 additions & 22 deletions lib/corebulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,43 @@ type ErrorBuffer struct {
Buf *bytes.Buffer
}

// An error implementation which contains the actual items in the bulk indexing response.
type BulkIndexingError struct {
Items []map[string]interface{}
}

func (e BulkIndexingError) Error() string {
return fmt.Sprintf("Bulk Insertion Error. Failed item count [%d]", len(e.Items))
}

type DocVersion struct {
// The version to assign to the document
Version int64

// The Version Type to assign to the document
VersionType VersionType
}

// An enum representing the various allowed version types.
// see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#_version_types
type VersionType string

const (
INTERNAL VersionType = "internal"
EXTERNAL VersionType = "external"
EXTERNAL_GT VersionType = "external_gt"
EXTERNAL_GTE VersionType = "external_gte"
FORCE VersionType = "force"
)

// Creates a DocVersion struct with the given value and this version type.
func (t VersionType) V(v int64) *DocVersion {
return &DocVersion{
Version: v,
VersionType: t,
}
}

// A bulk indexer creates goroutines, and channels for connecting and sending data
// to elasticsearch in bulk, using buffers.
type BulkIndexer struct {
Expand Down Expand Up @@ -94,6 +131,9 @@ type BulkIndexer struct {
mu sync.Mutex
// Wait Group for the http sends
sendWg *sync.WaitGroup

// numPendingSends tracks queued buffers pending 'send' in sendBuf
numPendingSends int64
}

func (b *BulkIndexer) NumErrors() uint64 {
Expand Down Expand Up @@ -162,9 +202,15 @@ func (b *BulkIndexer) Stop() {
}

func (b *BulkIndexer) PendingDocuments() int {
b.mu.Lock()
defer b.mu.Unlock()
return b.docCt
}

func (b *BulkIndexer) PendingSends() int64 {
return atomic.LoadInt64(&b.numPendingSends)
}

// Flush all current documents to ElasticSearch
func (b *BulkIndexer) Flush() {
b.mu.Lock()
Expand All @@ -184,7 +230,7 @@ func (b *BulkIndexer) startHttpSender() {
b.sendWg.Add(1)
go func() {
for buf := range b.sendBuf {
// Copy for the potential re-send.
// Copy so we can put the buffer on the error channel, or potentially re-send it.
bufCopy := bytes.NewBuffer(buf.Bytes())
err := b.Sender(buf)

Expand All @@ -193,18 +239,18 @@ func (b *BulkIndexer) startHttpSender() {
// 2. Retry then return error and let runner decide
// 3. Retry, then log to disk? retry later?
if err != nil {
buf = bufCopy
if b.RetryForSeconds > 0 {
//copy again so we can keep the original buffer for the error channel.
bufCopy := bytes.NewBuffer(buf.Bytes())
time.Sleep(time.Second * time.Duration(b.RetryForSeconds))
err = b.Sender(bufCopy)
if err == nil {
// Successfully re-sent with no error
continue
}
}
if b.ErrorChannel != nil {
if err != nil && b.ErrorChannel != nil {
b.ErrorChannel <- &ErrorBuffer{err, buf}
}
}
atomic.AddInt64(&b.numPendingSends, -1)
}
b.sendWg.Done()
}()
Expand Down Expand Up @@ -260,6 +306,7 @@ func (b *BulkIndexer) startDocChannel() {

func (b *BulkIndexer) send(buf *bytes.Buffer) {
//b2 := *b.buf
atomic.AddInt64(&b.numPendingSends, 1)
b.sendBuf <- buf
b.buf = new(bytes.Buffer)
// b.buf.Reset()
Expand All @@ -277,28 +324,35 @@ func (b *BulkIndexer) shutdown() {
// The index bulk API adds or updates a typed JSON document to a specific index, making it searchable.
// it operates by buffering requests, and ocassionally flushing to elasticsearch
// http://www.elasticsearch.org/guide/reference/api/bulk.html
func (b *BulkIndexer) Index(index string, _type string, id, parent, ttl string, date *time.Time, data interface{}) error {
func (b *BulkIndexer) Index(index string, _type string, id, parent, ttl string, date *time.Time, version *DocVersion, data interface{}) error {
//{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
by, err := WriteBulkBytes("index", index, _type, id, parent, ttl, date, data)
by, err := WriteBulkBytes("index", index, _type, id, parent, ttl, date, version, data)
if err != nil {
return err
}
b.bulkChannel <- by
return nil
}

func (b *BulkIndexer) Update(index string, _type string, id, parent, ttl string, date *time.Time, data interface{}) error {
func (b *BulkIndexer) Update(index string, _type string, id, parent, ttl string, date *time.Time, version *DocVersion, data interface{}) error {
//{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
by, err := WriteBulkBytes("update", index, _type, id, parent, ttl, date, data)
by, err := WriteBulkBytes("update", index, _type, id, parent, ttl, date, version, data)
if err != nil {
return err
}
b.bulkChannel <- by
return nil
}

func (b *BulkIndexer) Delete(index, _type, id string) {
queryLine := fmt.Sprintf("{\"delete\":{\"_index\":%q,\"_type\":%q,\"_id\":%q}}\n", index, _type, id)
func (b *BulkIndexer) Delete(index, _type, id string, version *DocVersion) {
verStr := ""
if version != nil {
verStr = fmt.Sprintf(",\"_version\":%d", version.Version)
if version.VersionType != "" {
verStr = verStr + ",\"_version_type\":\"" + string(version.VersionType) + "\""
}
}
queryLine := fmt.Sprintf("{\"delete\":{\"_index\":%q,\"_type\":%q,\"_id\":%q%s}}\n", index, _type, id, verStr)
b.bulkChannel <- []byte(queryLine)
return
}
Expand All @@ -307,27 +361,27 @@ func (b *BulkIndexer) UpdateWithWithScript(index string, _type string, id, paren

var data map[string]interface{} = make(map[string]interface{})
data["script"] = script
return b.Update(index, _type, id, parent, ttl, date, data)
return b.Update(index, _type, id, parent, ttl, date, nil, data)
}

func (b *BulkIndexer) UpdateWithPartialDoc(index string, _type string, id, parent, ttl string, date *time.Time, partialDoc interface{}, upsert bool) error {
func (b *BulkIndexer) UpdateWithPartialDoc(index string, _type string, id, parent, ttl string, date *time.Time, version *DocVersion, partialDoc interface{}, upsert bool) error {

var data map[string]interface{} = make(map[string]interface{})

data["doc"] = partialDoc
if upsert {
data["doc_as_upsert"] = true
}
return b.Update(index, _type, id, parent, ttl, date, data)
return b.Update(index, _type, id, parent, ttl, date, version, data)
}

// This does the actual send of a buffer, which has already been formatted
// into bytes of ES formatted bulk data
func (b *BulkIndexer) Send(buf *bytes.Buffer) error {
type responseStruct struct {
Took int64 `json:"took"`
Errors bool `json:"errors"`
Items []map[string]interface{} `json:"items"`
Took int64 `json:"took"`
Errors bool `json:"errors"`
Items *json.RawMessage `json:"items"`
}

response := responseStruct{}
Expand All @@ -341,17 +395,37 @@ func (b *BulkIndexer) Send(buf *bytes.Buffer) error {
// check for response errors, bulk insert will give 200 OK but then include errors in response
jsonErr := json.Unmarshal(body, &response)
if jsonErr == nil {
//unmarshal the errors only if we need to
if response.Errors {
atomic.AddUint64(&b.numErrors, uint64(len(response.Items)))
return fmt.Errorf("Bulk Insertion Error. Failed item count [%d]", len(response.Items))
var items []map[string]interface{}
jsonErr = json.Unmarshal([]byte(*response.Items), &items)
if jsonErr != nil {
return jsonErr
}

for _, item := range items {
for _, body := range item {
body, ok := body.(map[string]interface{})
if !ok {
continue
}
if status, ok := body["status"]; ok && status.(float64) > 304 {
atomic.AddUint64(&b.numErrors, 1)
}
}
}

return BulkIndexingError{items}
}
} else {
return jsonErr
}
return nil
}

// Given a set of arguments for index, type, id, data create a set of bytes that is formatted for bulkd index
// http://www.elasticsearch.org/guide/reference/api/bulk.html
func WriteBulkBytes(op string, index string, _type string, id, parent, ttl string, date *time.Time, data interface{}) ([]byte, error) {
func WriteBulkBytes(op string, index string, _type string, id, parent, ttl string, date *time.Time, version *DocVersion, data interface{}) ([]byte, error) {
// only index and update are currently supported
if op != "index" && op != "update" {
return nil, errors.New(fmt.Sprintf("Operation '%s' is not yet supported", op))
Expand All @@ -376,7 +450,7 @@ func WriteBulkBytes(op string, index string, _type string, id, parent, ttl strin
buf.WriteString(`"`)
}

if op == "update" {
if op == "update" && version == nil {
buf.WriteString(`,"_retry_on_conflict":3`)
}

Expand All @@ -390,6 +464,15 @@ func WriteBulkBytes(op string, index string, _type string, id, parent, ttl strin
buf.WriteString(strconv.FormatInt(date.UnixNano()/1e6, 10))
buf.WriteString(`"`)
}
if version != nil {
buf.WriteString(`,"_version":`)
buf.WriteString(strconv.FormatInt(version.Version, 10))
if version.VersionType != "" {
buf.WriteString(`,"_version_type":"`)
buf.WriteString(string(version.VersionType))
buf.WriteString(`"`)
}
}

buf.WriteString(`}}`)
buf.WriteRune('\n')
Expand Down
Loading