Skip to content

Commit f8e0de1

Browse files
committed
retry upsert on recoverable error.
1 parent eb5ba52 commit f8e0de1

File tree

2 files changed

+125
-1
lines changed

2 files changed

+125
-1
lines changed

lib/fluent/plugin/elasticsearch_error_handler.rb

+2
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ def handle_error(response, tag, chunk, bulk_message_count, extracted_values)
6060
write_operation = @plugin.write_operation
6161
elsif INDEX_OP == @plugin.write_operation && item.is_a?(Hash) && item.has_key?(CREATE_OP)
6262
write_operation = CREATE_OP
63+
elsif UPSERT_OP == @plugin.write_operation && item.is_a?(Hash) && item.has_key?(UPDATE_OP)
64+
write_operation = UPDATE_OP
6365
elsif item.nil?
6466
stats[:errors_nil_resp] += 1
6567
next

test/plugin/test_elasticsearch_error_handler.rb

+123-1
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ class TestElasticsearchErrorHandler < Test::Unit::TestCase
77

88
class TestPlugin
99
attr_reader :log
10-
attr_reader :write_operation, :error_events
10+
attr_reader :error_events
1111
attr_accessor :unrecoverable_error_types
1212
attr_accessor :log_es_400_reason
13+
attr_accessor :write_operation
1314
def initialize(log, log_es_400_reason = false)
1415
@log = log
1516
@write_operation = 'index'
@@ -522,4 +523,125 @@ def test_unrecoverable_error_included_in_responses
522523

523524
end
524525

526+
def test_retry_error_upsert
527+
@plugin.write_operation = 'upsert'
528+
records = []
529+
error_records = Hash.new(false)
530+
error_records.merge!({0=>true, 4=>true, 9=>true})
531+
10.times do |i|
532+
records << {time: 12345, record: {"message"=>"record #{i}","_id"=>i,"raise"=>error_records[i]}}
533+
end
534+
chunk = MockChunk.new(records)
535+
536+
response = parse_response(%({
537+
"took" : 1,
538+
"errors" : true,
539+
"items" : [
540+
{
541+
"update" : {
542+
"_index" : "foo",
543+
"_type" : "bar",
544+
"_id" : "1",
545+
"status" : 201
546+
}
547+
},
548+
{
549+
"update" : {
550+
"_index" : "foo",
551+
"_type" : "bar",
552+
"_id" : "2",
553+
"status" : 500,
554+
"error" : {
555+
"type" : "some unrecognized type",
556+
"reason":"unrecognized error"
557+
}
558+
}
559+
},
560+
{
561+
"update" : {
562+
"_index" : "foo",
563+
"_type" : "bar",
564+
"_id" : "3",
565+
"status" : 409,
566+
"error" : {
567+
"type":"version_conflict_engine_exception",
568+
"reason":"document already exists"
569+
}
570+
}
571+
},
572+
{
573+
"update" : {
574+
"_index" : "foo",
575+
"_type" : "bar",
576+
"_id" : "5",
577+
"status" : 500,
578+
"error" : {
579+
"reason":"unrecognized error - no type field"
580+
}
581+
}
582+
},
583+
{
584+
"update" : {
585+
"_index" : "foo",
586+
"_type" : "bar",
587+
"_id" : "6",
588+
"status" : 400,
589+
"error" : {
590+
"type" : "mapper_parsing_exception",
591+
"reason":"failed to parse"
592+
}
593+
}
594+
},
595+
{
596+
"update" : {
597+
"_index" : "foo",
598+
"_type" : "bar",
599+
"_id" : "7",
600+
"status" : 400,
601+
"error" : {
602+
"type" : "some unrecognized type",
603+
"reason":"unrecognized error"
604+
}
605+
}
606+
},
607+
{
608+
"update" : {
609+
"_index" : "foo",
610+
"_type" : "bar",
611+
"_id" : "8",
612+
"status" : 500,
613+
"error" : {
614+
"type" : "some unrecognized type",
615+
"reason":"unrecognized error"
616+
}
617+
}
618+
}
619+
]
620+
}))
621+
622+
begin
623+
failed = false
624+
dummy_extracted_values = []
625+
@handler.handle_error(response, 'atag', chunk, response['items'].length, dummy_extracted_values)
626+
rescue Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchRequestAbortError, Fluent::Plugin::ElasticsearchOutput::RetryStreamError=>e
627+
failed = true
628+
records = [].tap do |records|
629+
next unless e.respond_to?(:retry_stream)
630+
e.retry_stream.each {|time, record| records << record}
631+
end
632+
puts records
633+
assert_equal 3, records.length
634+
assert_equal 2, records[0]['_id']
635+
# upsert is retried in case of conflict error.
636+
assert_equal 3, records[1]['_id']
637+
assert_equal 8, records[2]['_id']
638+
error_ids = @plugin.error_events.collect {|h| h[:record]['_id']}
639+
assert_equal 3, error_ids.length
640+
assert_equal [5, 6, 7], error_ids
641+
@plugin.error_events.collect {|h| h[:error]}.each do |e|
642+
assert_true e.respond_to?(:backtrace)
643+
end
644+
end
645+
assert_true failed
646+
end
525647
end

0 commit comments

Comments
 (0)