Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add parent and routing options for bulk operations #292

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 36 additions & 13 deletions lib/corebulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ func (b *BulkIndexer) Start() {
if b.Sender == nil {
b.Sender = b.Send
}
// Resize bulk channel buffer if max docs greater than default
if b.BulkMaxDocs > BulkMaxDocs {
b.bulkChannel = make(chan []byte, b.BulkMaxDocs)
}
// Backwards compatibility
b.startHttpSender()
b.startDocChannel()
Expand Down Expand Up @@ -278,48 +282,56 @@ func (b *BulkIndexer) shutdown() {
// The index bulk API adds or updates a typed JSON document to a specific index, making it searchable.
// it operates by buffering requests, and ocassionally flushing to elasticsearch
// http://www.elasticsearch.org/guide/reference/api/bulk.html
func (b *BulkIndexer) Index(index string, _type string, id, parent, ttl string, date *time.Time, data interface{}) error {
func (b *BulkIndexer) Index(index string, _type string, id, parent, routing, ttl string, date *time.Time, data interface{}) error {
//{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
by, err := WriteBulkBytes("index", index, _type, id, parent, ttl, date, data)
by, err := WriteBulkBytes("index", index, _type, id, parent, routing, ttl, date, data)
if err != nil {
return err
}
b.bulkChannel <- by
return nil
}

func (b *BulkIndexer) Update(index string, _type string, id, parent, ttl string, date *time.Time, data interface{}) error {
func (b *BulkIndexer) Update(index string, _type string, id, parent, routing, ttl string, date *time.Time, data interface{}) error {
//{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
by, err := WriteBulkBytes("update", index, _type, id, parent, ttl, date, data)
by, err := WriteBulkBytes("update", index, _type, id, parent, routing, ttl, date, data)
if err != nil {
return err
}
b.bulkChannel <- by
return nil
}

func (b *BulkIndexer) Delete(index, _type, id string) {
queryLine := fmt.Sprintf("{\"delete\":{\"_index\":%q,\"_type\":%q,\"_id\":%q}}\n", index, _type, id)
b.bulkChannel <- []byte(queryLine)
func (b *BulkIndexer) Delete(index, _type, _parent, _routing, id string) {
queryLine := bytes.Buffer{}
queryLine.WriteString(fmt.Sprintf("{\"delete\":{\"_index\":%q,\"_type\":%q,\"_id\":%q", index, _type, id))
if len(_parent) > 0 {
queryLine.WriteString(fmt.Sprintf(",\"_parent\":%q", _parent))
}
if len(_routing) > 0 {
queryLine.WriteString(fmt.Sprintf(",\"_routing\":%q", _routing))
}
queryLine.WriteString("}}\n")
b.bulkChannel <- queryLine.Bytes()
return
}

func (b *BulkIndexer) UpdateWithWithScript(index string, _type string, id, parent, ttl string, date *time.Time, script string) error {
func (b *BulkIndexer) UpdateWithWithScript(index string, _type string, id, parent, routing, ttl string, date *time.Time, script string) error {

var data map[string]interface{} = make(map[string]interface{})
data["script"] = script
return b.Update(index, _type, id, parent, ttl, date, data)
return b.Update(index, _type, id, parent, routing, ttl, date, data)
}

func (b *BulkIndexer) UpdateWithPartialDoc(index string, _type string, id, parent, ttl string, date *time.Time, partialDoc interface{}, upsert bool) error {
func (b *BulkIndexer) UpdateWithPartialDoc(index string, _type string, id, parent, routing, ttl string, date *time.Time, partialDoc interface{}, upsert bool) error {

var data map[string]interface{} = make(map[string]interface{})

data["doc"] = partialDoc
if upsert {
data["doc_as_upsert"] = true
}
return b.Update(index, _type, id, parent, ttl, date, data)
return b.Update(index, _type, id, parent, routing, ttl, date, data)
}

// This does the actual send of a buffer, which has already been formatted
Expand All @@ -344,15 +356,20 @@ func (b *BulkIndexer) Send(buf *bytes.Buffer) error {
if jsonErr == nil {
if response.Errors {
atomic.AddUint64(&b.numErrors, uint64(len(response.Items)))
return fmt.Errorf("Bulk Insertion Error. Failed item count [%d]", len(response.Items))
itemJson, err := json.Marshal(response.Items)
if err == nil {
return fmt.Errorf("Bulk Insertion Error. Failed item count [%d]. Response Items: %s", len(response.Items), itemJson)
} else {
return fmt.Errorf("Bulk Insertion Error. Failed item count [%d]. Error marshalling response items: %s", len(response.Items), err)
}
}
}
return nil
}

// Given a set of arguments for index, type, id, data create a set of bytes that is formatted for bulkd index
// http://www.elasticsearch.org/guide/reference/api/bulk.html
func WriteBulkBytes(op string, index string, _type string, id, parent, ttl string, date *time.Time, data interface{}) ([]byte, error) {
func WriteBulkBytes(op string, index string, _type string, id, parent, routing, ttl string, date *time.Time, data interface{}) ([]byte, error) {
// only index and update are currently supported
if op != "index" && op != "update" {
return nil, errors.New(fmt.Sprintf("Operation '%s' is not yet supported", op))
Expand All @@ -377,6 +394,12 @@ func WriteBulkBytes(op string, index string, _type string, id, parent, ttl strin
buf.WriteString(`"`)
}

if len(routing) > 0 {
buf.WriteString(`,"_routing":"`)
buf.WriteString(routing)
buf.WriteString(`"`)
}

if op == "update" {
buf.WriteString(`,"_retry_on_conflict":3`)
}
Expand Down
98 changes: 86 additions & 12 deletions lib/corebulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestBulkIndexerBasic(t *testing.T) {
"date": "yesterday",
}

err := indexer.Index(testIndex, "user", "1", "", "", &date, data)
err := indexer.Index(testIndex, "user", "1", "", "", "", &date, data)
waitFor(func() bool {
return buffers.Length() > 0
}, 5)
Expand All @@ -112,7 +112,7 @@ func TestBulkIndexerBasic(t *testing.T) {
expectedBytes := 129
assert.T(t, totalBytesSent == expectedBytes, fmt.Sprintf("Should have sent %v bytes but was %v", expectedBytes, totalBytesSent))

err = indexer.Index(testIndex, "user", "2", "", "", nil, data)
err = indexer.Index(testIndex, "user", "2", "", "", "", nil, data)
waitFor(func() bool {
return buffers.Length() > 1
}, 5)
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestRefreshParam(t *testing.T) {
indexer.Start()
<-time.After(time.Millisecond * 20)

indexer.Index("users", "user", "2", "", "", &date, data)
indexer.Index("users", "user", "2", "", "", "", &date, data)

<-time.After(time.Millisecond * 200)
// indexer.Flush()
Expand All @@ -174,7 +174,7 @@ func TestWithoutRefreshParam(t *testing.T) {
indexer.Start()
<-time.After(time.Millisecond * 20)

indexer.Index("users", "user", "2", "", "", &date, data)
indexer.Index("users", "user", "2", "", "", "", &date, data)

<-time.After(time.Millisecond * 200)
// indexer.Flush()
Expand Down Expand Up @@ -215,7 +215,7 @@ func XXXTestBulkUpdate(t *testing.T) {
data := map[string]interface{}{
"script": "ctx._source.count += 2",
}
err = indexer.Update("users", "user", "5", "", "", &date, data)
err = indexer.Update("users", "user", "5", "", "", "", &date, data)
// So here's the deal. Flushing does seem to work, you just have to give the
// channel a moment to recieve the message ...
// <- time.After(time.Millisecond * 20)
Expand Down Expand Up @@ -261,16 +261,57 @@ func TestBulkSmallBatch(t *testing.T) {
indexer.Start()
<-time.After(time.Millisecond * 20)

indexer.Index("users", "user", "2", "", "", &date, data)
indexer.Index("users", "user", "3", "", "", &date, data)
indexer.Index("users", "user", "4", "", "", &date, data)
indexer.Index("users", "user", "2", "", "", "", &date, data)
indexer.Index("users", "user", "3", "", "", "", &date, data)
indexer.Index("users", "user", "4", "", "", "", &date, data)
<-time.After(time.Millisecond * 200)
// indexer.Flush()
indexer.Stop()
assert.T(t, messageSets == 2, fmt.Sprintf("Should have sent 2 message sets %d", messageSets))

}

func TestBulkInsertWithMeta(t *testing.T) {
InitTests(true)
var lock sync.Mutex
c := NewTestConn()
indexer := c.NewBulkIndexer(1)
sentBytes := []byte{}

indexer.Sender = func(buf *bytes.Buffer) error {
lock.Lock()
sentBytes = append(sentBytes, buf.Bytes()...)
lock.Unlock()
return nil
}

indexer.Start()

data := map[string]interface{}{
"name": "smurfs",
"age": 22,
"date": "yesterday",
}

indexer.Index(testIndex, "user", "1", "p", "", "", nil, data)
indexer.Index(testIndex, "user", "2", "", "r", "", nil, data)

indexer.Flush()
indexer.Stop()

lock.Lock()
sent := string(sentBytes)
lock.Unlock()

expected := `{"index":{"_index":"github","_type":"user","_id":"1","_parent":"p"}}
{"age":22,"date":"yesterday","name":"smurfs"}
{"index":{"_index":"github","_type":"user","_id":"2","_routing":"r"}}
{"age":22,"date":"yesterday","name":"smurfs"}
`
asExpected := sent == expected
assert.T(t, asExpected, fmt.Sprintf("Should have sent '%s' but actually sent '%s'", expected, sent))
}

func TestBulkDelete(t *testing.T) {
InitTests(true)
var lock sync.Mutex
Expand All @@ -287,7 +328,7 @@ func TestBulkDelete(t *testing.T) {

indexer.Start()

indexer.Delete("fake", "fake_type", "1")
indexer.Delete("fake", "fake_type", "", "", "1")

indexer.Flush()
indexer.Stop()
Expand All @@ -302,6 +343,39 @@ func TestBulkDelete(t *testing.T) {
assert.T(t, asExpected, fmt.Sprintf("Should have sent '%s' but actually sent '%s'", expected, sent))
}

func TestBulkDeleteWithMeta(t *testing.T) {
InitTests(true)
var lock sync.Mutex
c := NewTestConn()
indexer := c.NewBulkIndexer(1)
sentBytes := []byte{}

indexer.Sender = func(buf *bytes.Buffer) error {
lock.Lock()
sentBytes = append(sentBytes, buf.Bytes()...)
lock.Unlock()
return nil
}

indexer.Start()

indexer.Delete("fake", "fake_type", "p", "", "1")
indexer.Delete("fake", "fake_type", "", "r", "1")

indexer.Flush()
indexer.Stop()

lock.Lock()
sent := string(sentBytes)
lock.Unlock()

expected := `{"delete":{"_index":"fake","_type":"fake_type","_id":"1","_parent":"p"}}
{"delete":{"_index":"fake","_type":"fake_type","_id":"1","_routing":"r"}}
`
asExpected := sent == expected
assert.T(t, asExpected, fmt.Sprintf("Should have sent '%s' but actually sent '%s'", expected, sent))
}

func XXXTestBulkErrors(t *testing.T) {
// lets set a bad port, and hope we get a conn refused error?
c := NewTestConn()
Expand All @@ -316,7 +390,7 @@ func XXXTestBulkErrors(t *testing.T) {
for i := 0; i < 20; i++ {
date := time.Unix(1257894000, 0)
data := map[string]interface{}{"name": "smurfs", "age": 22, "date": date}
indexer.Index("users", "user", strconv.Itoa(i), "", "", &date, data)
indexer.Index("users", "user", strconv.Itoa(i), "", "", "", &date, data)
}
}()
var errBuf *ErrorBuffer
Expand Down Expand Up @@ -356,7 +430,7 @@ func BenchmarkSend(b *testing.B) {
about := make([]byte, 1000)
rand.Read(about)
data := map[string]interface{}{"name": "smurfs", "age": 22, "date": time.Unix(1257894000, 0), "about": about}
indexer.Index("users", "user", strconv.Itoa(i), "", "", nil, data)
indexer.Index("users", "user", strconv.Itoa(i), "", "", "", nil, data)
}
log.Printf("Sent %d messages in %d sets totaling %d bytes \n", b.N, sets, totalBytes)
if indexer.NumErrors() != 0 {
Expand Down Expand Up @@ -390,7 +464,7 @@ func BenchmarkSendBytes(b *testing.B) {
return indexer.Send(buf)
}
for i := 0; i < b.N; i++ {
indexer.Index("users", "user", strconv.Itoa(i), "", "", nil, body)
indexer.Index("users", "user", strconv.Itoa(i), "", "", "", nil, body)
}
log.Printf("Sent %d messages in %d sets totaling %d bytes \n", b.N, sets, totalBytes)
if indexer.NumErrors() != 0 {
Expand Down
2 changes: 1 addition & 1 deletion lib/coretest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func LoadTestData() {
log.Println("HM, already exists? ", ge.Url)
}
docsm[id] = true
indexer.Index(testIndex, ge.Type, id, "", "", &ge.Created, line)
indexer.Index(testIndex, ge.Type, id, "", "", "", &ge.Created, line)
docCt++
} else {
log.Println("ERROR? ", string(line))
Expand Down