From ec36553e38fb98f706cb4d1a0f73b3284ed9535f Mon Sep 17 00:00:00 2001 From: andsel Date: Tue, 13 Jul 2021 17:51:25 +0200 Subject: [PATCH 01/17] Draft the idea of the checks done at the same time of the license check --- .../outputs/elasticsearch/http_client/pool.rb | 57 +++++++++++++++---- 1 file changed, 47 insertions(+), 10 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb index 1675afbb..e71a0279 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb @@ -237,16 +237,21 @@ def healthcheck! @state_mutex.synchronize { @url_info.select {|url,meta| meta[:state] != :alive } }.each do |url,meta| begin health_check_request(url) - # If no exception was raised it must have succeeded! - logger.warn("Restored connection to ES instance", url: url.sanitized.to_s) - # We reconnected to this node, check its ES version - es_version = get_es_version(url) - @state_mutex.synchronize do - meta[:version] = es_version - set_last_es_version(es_version, url) - - alive = @license_checker.appropriate_license?(self, url) - meta[:state] = alive ? :alive : :dead + if elasticsearch?(url) + # If no exception was raised it must have succeeded! + logger.warn("Restored connection to ES instance", url: url.sanitized.to_s) + # We reconnected to this node, check its ES version + es_version = get_es_version(url) + @state_mutex.synchronize do + meta[:version] = es_version + set_last_es_version(es_version, url) + + alive = @license_checker.appropriate_license?(self, url) + meta[:state] = alive ? :alive : :dead + end + else + logger.info("URL doesn't point to Elasticsearch service", url: url.sanitized.to_s ) + @state_mutex.synchronize { meta[:state] = :dead } end rescue HostUnreachableError, BadResponseCodeError => e logger.warn("Attempted to resurrect connection to dead ES instance, but got an error", url: url.sanitized.to_s, exception: e.class, message: e.message) @@ -254,6 +259,38 @@ def healthcheck! end end + def elasticsearch?(url) + response = perform_request_to_url(url, :get, "/") + return false if response.code == 401 || response.code == 403 + + version_info = LogStash::Json.load(response.body) + return false if version_info['version'].nil? + + version = Gem::Version.new(version_info["version"]['number']) + return false if version < Gem::Version.new('6.0.0') + + if version >= Gem::Version.new('6.0.0') && version < Gem::Version.new('7.0.0') + return valid_tagline?(version_info) + elsif version >= Gem::Version.new('7.0.0') && version < Gem::Version.new('7.14.0') + build_flavour = version_info["version"]['build_flavour'] + return false if build_flavour.nil? || build_flavour != 'default' || !valid_tagline?(version_info) + else + # case >= 7.14 + product_header = response.headers['X-elastic-product'] + return false if product_header.nil? || product_header != 'Elasticsearch' + end + return true + rescue => e + logger.error("Unable to version version information", url: url.sanitized.to_s, exception: e.class, message: e.message) + false + end + + def valid_tagline?(version_info) + tagline = version_info['tagline'] + return false if tagline.nil? || tagline != "You Know, for Search" + true + end + def stop_resurrectionist @resurrectionist.join if @resurrectionist end From 9306b11bca2f912b5a8dac12de04a03b2496a6f6 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 14 Jul 2021 15:31:17 +0200 Subject: [PATCH 02/17] Fixed failing test and store last version in case of bad Elasticsearch --- .../outputs/elasticsearch/http_client/pool.rb | 7 +++- .../elasticsearch/http_client/pool_spec.rb | 34 +++++++++++++++---- spec/unit/outputs/elasticsearch_spec.rb | 2 +- 3 files changed, 35 insertions(+), 8 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb index e71a0279..cbafdb26 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb @@ -251,7 +251,12 @@ def healthcheck! end else logger.info("URL doesn't point to Elasticsearch service", url: url.sanitized.to_s ) - @state_mutex.synchronize { meta[:state] = :dead } + es_version = get_es_version(url) + @state_mutex.synchronize do + meta[:version] = es_version + set_last_es_version(es_version, url) + meta[:state] = :dead + end end rescue HostUnreachableError, BadResponseCodeError => e logger.warn("Attempted to resurrect connection to dead ES instance, but got an error", url: url.sanitized.to_s, exception: e.class, message: e.message) diff --git a/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb b/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb index 58511152..e89e835a 100644 --- a/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb +++ b/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb @@ -50,13 +50,16 @@ describe "healthcheck url handling" do let(:initial_urls) { [::LogStash::Util::SafeURI.new("http://localhost:9200")] } + before(:example) do + expect(adapter).to receive(:perform_request).with(anything, :get, "/", anything, anything) do |url, _, _, _, _| + expect(url.path).to be_empty + end + end context "and not setting healthcheck_path" do it "performs the healthcheck to the root" do - expect(adapter).to receive(:perform_request) do |url, method, req_path, _, _| - expect(method).to eq(:head) + expect(adapter).to receive(:perform_request).with(anything, :head, "/", anything, anything) do |url, _, _, _, _| expect(url.path).to be_empty - expect(req_path).to eq("/") end subject.healthcheck! end @@ -66,10 +69,8 @@ let(:healthcheck_path) { "/my/health" } let(:options) { super().merge(:healthcheck_path => healthcheck_path) } it "performs the healthcheck to the healthcheck_path" do - expect(adapter).to receive(:perform_request) do |url, method, req_path, _, _| - expect(method).to eq(:head) + expect(adapter).to receive(:perform_request).with(anything, :head, eq(healthcheck_path), anything, anything) do |url, _, _, _, _| expect(url.path).to be_empty - expect(req_path).to eq(healthcheck_path) end subject.healthcheck! end @@ -175,8 +176,27 @@ end context "with multiple URLs in the list" do + let(:version_ok) do + class MockResponse + def body + {"tagline" => "You Know, for Search", + "version" => { + "number" => '7.13.0', + "build_flavour" => 'default'} + }.to_json + end + + def code + 200 + end + end + #{ 'body' => response_body.to_json, "code" => 200 } + MockResponse.new + end + before :each do allow(adapter).to receive(:perform_request).with(anything, :head, subject.healthcheck_path, {}, nil) + allow(adapter).to receive(:perform_request).with(anything, :get, subject.healthcheck_path, {}, nil).and_return(version_ok) end let(:initial_urls) { [ ::LogStash::Util::SafeURI.new("http://localhost:9200"), ::LogStash::Util::SafeURI.new("http://localhost:9201"), ::LogStash::Util::SafeURI.new("http://localhost:9202") ] } @@ -240,6 +260,7 @@ describe "license checking" do before(:each) do allow(subject).to receive(:health_check_request) + allow(subject).to receive(:elasticsearch?).and_return(true) end let(:options) do @@ -273,6 +294,7 @@ before(:each) do allow(subject).to receive(:health_check_request) + allow(subject).to receive(:elasticsearch?).and_return(true) end context "if ES doesn't return a valid license" do diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 46821a55..cd0e5165 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -156,7 +156,7 @@ include_examples("an authenticated config") end - context 'claud_auth also set' do + context 'cloud_auth also set' do let(:do_register) { false } # this is what we want to test, so we disable the before(:each) call let(:options) { { "user" => user, "password" => password, "cloud_auth" => "elastic:my-passwd-00" } } From 496a70b3fa3cdd572ec2e083dc8759c1facf9114 Mon Sep 17 00:00:00 2001 From: andsel Date: Thu, 15 Jul 2021 17:27:22 +0200 Subject: [PATCH 03/17] Added unit test to validate the authenticity rules --- .../outputs/elasticsearch/http_client/pool.rb | 10 +- logstash-output-elasticsearch.gemspec | 1 + .../elasticsearch/http_client/pool_spec.rb | 97 +++++++++++++++++++ 3 files changed, 105 insertions(+), 3 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb index cbafdb26..814c05cb 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb @@ -265,8 +265,12 @@ def healthcheck! end def elasticsearch?(url) - response = perform_request_to_url(url, :get, "/") - return false if response.code == 401 || response.code == 403 + begin + response = perform_request_to_url(url, :get, "/") + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + return false if response.code == 401 || response.code == 403 + raise e + end version_info = LogStash::Json.load(response.body) return false if version_info['version'].nil? @@ -281,7 +285,7 @@ def elasticsearch?(url) return false if build_flavour.nil? || build_flavour != 'default' || !valid_tagline?(version_info) else # case >= 7.14 - product_header = response.headers['X-elastic-product'] + product_header = response.headers['x-elastic-product'] return false if product_header.nil? || product_header != 'Elasticsearch' end return true diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec index 30697f0f..29147f6e 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch.gemspec @@ -31,6 +31,7 @@ Gem::Specification.new do |s| s.add_development_dependency 'flores' s.add_development_dependency 'cabin', ['~> 0.6'] s.add_development_dependency 'webrick' + s.add_development_dependency 'webmock' # Still used in some specs, we should remove this ASAP s.add_development_dependency 'elasticsearch' end diff --git a/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb b/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb index e89e835a..3fa0c0e1 100644 --- a/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb +++ b/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb @@ -1,6 +1,7 @@ require "logstash/devutils/rspec/spec_helper" require "logstash/outputs/elasticsearch/http_client" require 'cabin' +require 'webmock/rspec' describe LogStash::Outputs::ElasticSearch::HttpClient::Pool do let(:logger) { Cabin::Channel.get } @@ -341,3 +342,99 @@ def code end end end + +describe "#elasticsearch?" do + let(:logger) { Cabin::Channel.get } + let(:adapter) { LogStash::Outputs::ElasticSearch::HttpClient::ManticoreAdapter.new(logger) } + let(:initial_urls) { [::LogStash::Util::SafeURI.new("http://localhost:9200")] } + let(:options) { {:resurrect_delay => 2, :url_normalizer => proc {|u| u}} } # Shorten the delay a bit to speed up tests + let(:es_node_versions) { [ "0.0.0" ] } + let(:license_status) { 'active' } + + subject { LogStash::Outputs::ElasticSearch::HttpClient::Pool.new(logger, adapter, initial_urls, options) } + + let(:url) { ::LogStash::Util::SafeURI.new("http://localhost:9200") } + + context "in case HTTP error code" do + it "should fail for 401" do + stub_request(:get, "localhost:9200").to_return(status: 401) + expect(subject.elasticsearch?(url)).to be false + end + + it "should fail for 403" do + stub_request(:get, "localhost:9200").to_return(status: 403) + expect(subject.elasticsearch?(url)).to be false + end + end + + context "when connecting to a cluster which reply without 'version' field" do + it "should fail" do + stub_request(:get, "localhost:9200").to_return(body: '{"field": "funky"}') + expect(subject.elasticsearch?(url)).to be false + end + end + + context "when connecting to a cluster with version < 6.0.0" do + it "should fail" do + stub_request(:get, "localhost:9200").to_return(body: '{"version": {"number": "5.0.0"}}') + expect(subject.elasticsearch?(url)).to be false + end + end + + context "when connecting to a cluster with version in [6.0.0..7.0.0)" do + it "must be successful with valid 'tagline'" do + stub_request(:get, "localhost:9200").to_return(status: 200, body: '{"version": {"number": "6.5.0"}, "tagline": "You Know, for Search"}') + expect(subject.elasticsearch?(url)).to be true + end + + it "should fail if invalid 'tagline'" do + stub_request(:get, "localhost:9200").to_return(body: '{"version": {"number": "6.5.0"}, "tagline": "You don\'t know"}') + expect(subject.elasticsearch?(url)).to be false + end + + it "should fail if 'tagline' is not present" do + stub_request(:get, "localhost:9200").to_return(body: '{"version": {"number": "6.5.0"}}') + expect(subject.elasticsearch?(url)).to be false + end + end + + context "when connecting to a cluster with version in [7.0.0..7.14.0)" do + it "must be successful is 'build_flavour' is 'default' and tagline is correct" do + stub_request(:get, "localhost:9200/").to_return(status: 200, body: '{"version": {"number": "7.5.0", "build_flavour": "default"}, "tagline": "You Know, for Search"}') + expect(subject.elasticsearch?(url)).to be true + end + + it "should fail if 'build_flavour' is not 'default' and tagline is correct" do + stub_request(:get, "localhost:9200").to_return(status: 200, body: '{"version": {"number": "7.5.0", "build_flavour": "oss"}, "tagline": "You Know, for Search"}') + expect(subject.elasticsearch?(url)).to be false + end + + it "should fail if 'build_flavour' is not present and tagline is correct" do + stub_request(:get, "localhost:9200").to_return(status: 200, body: '{"version": {"number": "7.5.0"}, "tagline": "You Know, for Search"}') + expect(subject.elasticsearch?(url)).to be false + end + end + + context "when connecting to a cluster with version >= 7.14.0" do + it "should fail if 'X-elastic-product' header is not present" do + stub_request(:get, "localhost:9200").to_return(status: 200, body: '{"version": {"number": "7.14.0"}}') + expect(subject.elasticsearch?(url)).to be false + end + + it "should fail if 'X-elastic-product' header is present but with bad value" do + stub_request(:get, "localhost:9200") + .to_return(status: 200, + headers: {'X-elastic-product' => 'not good'}, + body: '{"version": {"number": "7.14.0"}}') + expect(subject.elasticsearch?(url)).to be false + end + + it "must be successful when 'X-elastic-product' header is present with expected value" do + stub_request(:get, "localhost:9200") + .to_return(status: 200, + headers: {'X-elastic-product': 'Elasticsearch'}, + body: '{"version": {"number": "7.14.0"}}') + expect(subject.elasticsearch?(url)).to be true + end + end +end From 4a60bd10e5825a54ee73d9bfc2bf3a2ebd5b990e Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 16 Jul 2021 12:30:16 +0200 Subject: [PATCH 04/17] Remove WebMock stubbing to avoid interference with other tests --- spec/unit/outputs/elasticsearch/http_client/pool_spec.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb b/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb index 3fa0c0e1..aec9c55a 100644 --- a/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb +++ b/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb @@ -355,6 +355,8 @@ def code let(:url) { ::LogStash::Util::SafeURI.new("http://localhost:9200") } + after(:all) { WebMock.disable! } + context "in case HTTP error code" do it "should fail for 401" do stub_request(:get, "localhost:9200").to_return(status: 401) From 2e9cdeed266caad87f4eeb2f97fad4c53dc4eb25 Mon Sep 17 00:00:00 2001 From: andsel Date: Thu, 29 Jul 2021 16:02:25 +0200 Subject: [PATCH 05/17] Removed WebMock used to mock HTTP request response for Elasticsearch authenticy test, switched to direct stubbing --- .../outputs/elasticsearch/http_client/pool.rb | 3 +- .../elasticsearch/http_client/pool_spec.rb | 100 +++++++++++------- 2 files changed, 63 insertions(+), 40 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb index 814c05cb..e3bccf05 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb @@ -285,7 +285,8 @@ def elasticsearch?(url) return false if build_flavour.nil? || build_flavour != 'default' || !valid_tagline?(version_info) else # case >= 7.14 - product_header = response.headers['x-elastic-product'] + lower_headers = response.headers.transform_keys {|key| key.to_s.downcase } + product_header = lower_headers['x-elastic-product'] return false if product_header.nil? || product_header != 'Elasticsearch' end return true diff --git a/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb b/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb index aec9c55a..e3aa8f42 100644 --- a/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb +++ b/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb @@ -1,7 +1,6 @@ require "logstash/devutils/rspec/spec_helper" require "logstash/outputs/elasticsearch/http_client" require 'cabin' -require 'webmock/rspec' describe LogStash::Outputs::ElasticSearch::HttpClient::Pool do let(:logger) { Cabin::Channel.get } @@ -166,6 +165,20 @@ end end + class MockResponse + attr_reader :code, :headers + + def initialize(code = 200, body = nil, headers = {}) + @code = code + @body = body + @headers = headers + end + + def body + @body.to_json + end + end + describe "connection management" do before(:each) { subject.start } context "with only one URL in the list" do @@ -178,21 +191,11 @@ context "with multiple URLs in the list" do let(:version_ok) do - class MockResponse - def body - {"tagline" => "You Know, for Search", - "version" => { - "number" => '7.13.0', - "build_flavour" => 'default'} - }.to_json - end - - def code - 200 - end - end - #{ 'body' => response_body.to_json, "code" => 200 } - MockResponse.new + MockResponse.new(200, {"tagline" => "You Know, for Search", + "version" => { + "number" => '7.13.0', + "build_flavour" => 'default'} + }) end before :each do @@ -345,7 +348,7 @@ def code describe "#elasticsearch?" do let(:logger) { Cabin::Channel.get } - let(:adapter) { LogStash::Outputs::ElasticSearch::HttpClient::ManticoreAdapter.new(logger) } + let(:adapter) { double("Manticore Adapter") } let(:initial_urls) { [::LogStash::Util::SafeURI.new("http://localhost:9200")] } let(:options) { {:resurrect_delay => 2, :url_normalizer => proc {|u| u}} } # Shorten the delay a bit to speed up tests let(:es_node_versions) { [ "0.0.0" ] } @@ -355,87 +358,106 @@ def code let(:url) { ::LogStash::Util::SafeURI.new("http://localhost:9200") } - after(:all) { WebMock.disable! } - context "in case HTTP error code" do it "should fail for 401" do - stub_request(:get, "localhost:9200").to_return(status: 401) + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(MockResponse.new(401)) + expect(subject.elasticsearch?(url)).to be false end it "should fail for 403" do - stub_request(:get, "localhost:9200").to_return(status: 403) + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(status: 403) expect(subject.elasticsearch?(url)).to be false end end context "when connecting to a cluster which reply without 'version' field" do it "should fail" do - stub_request(:get, "localhost:9200").to_return(body: '{"field": "funky"}') + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(body: {"field" => "funky.com"}.to_json) expect(subject.elasticsearch?(url)).to be false end end context "when connecting to a cluster with version < 6.0.0" do it "should fail" do - stub_request(:get, "localhost:9200").to_return(body: '{"version": {"number": "5.0.0"}}') + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(200, {"version" => { "number" => "5.0.0"}}.to_json) expect(subject.elasticsearch?(url)).to be false end end context "when connecting to a cluster with version in [6.0.0..7.0.0)" do it "must be successful with valid 'tagline'" do - stub_request(:get, "localhost:9200").to_return(status: 200, body: '{"version": {"number": "6.5.0"}, "tagline": "You Know, for Search"}') + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(MockResponse.new(200, {"version" => {"number" => "6.5.0"}, "tagline" => "You Know, for Search"})) expect(subject.elasticsearch?(url)).to be true end it "should fail if invalid 'tagline'" do - stub_request(:get, "localhost:9200").to_return(body: '{"version": {"number": "6.5.0"}, "tagline": "You don\'t know"}') + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(MockResponse.new(200, {"version" => {"number" => "6.5.0"}, "tagline" => "You don't know"})) expect(subject.elasticsearch?(url)).to be false end it "should fail if 'tagline' is not present" do - stub_request(:get, "localhost:9200").to_return(body: '{"version": {"number": "6.5.0"}}') + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(MockResponse.new(200, {"version" => {"number" => "6.5.0"}})) expect(subject.elasticsearch?(url)).to be false end end context "when connecting to a cluster with version in [7.0.0..7.14.0)" do it "must be successful is 'build_flavour' is 'default' and tagline is correct" do - stub_request(:get, "localhost:9200/").to_return(status: 200, body: '{"version": {"number": "7.5.0", "build_flavour": "default"}, "tagline": "You Know, for Search"}') + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(MockResponse.new(200, {"version": {"number": "7.5.0", "build_flavour": "default"}, "tagline": "You Know, for Search"})) expect(subject.elasticsearch?(url)).to be true end it "should fail if 'build_flavour' is not 'default' and tagline is correct" do - stub_request(:get, "localhost:9200").to_return(status: 200, body: '{"version": {"number": "7.5.0", "build_flavour": "oss"}, "tagline": "You Know, for Search"}') + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(MockResponse.new(200, {"version": {"number": "7.5.0", "build_flavour": "oss"}, "tagline": "You Know, for Search"})) expect(subject.elasticsearch?(url)).to be false end it "should fail if 'build_flavour' is not present and tagline is correct" do - stub_request(:get, "localhost:9200").to_return(status: 200, body: '{"version": {"number": "7.5.0"}, "tagline": "You Know, for Search"}') + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(MockResponse.new(200, {"version": {"number": "7.5.0"}, "tagline": "You Know, for Search"})) expect(subject.elasticsearch?(url)).to be false end end context "when connecting to a cluster with version >= 7.14.0" do it "should fail if 'X-elastic-product' header is not present" do - stub_request(:get, "localhost:9200").to_return(status: 200, body: '{"version": {"number": "7.14.0"}}') + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(MockResponse.new(200, {"version": {"number": "7.14.0"}})) expect(subject.elasticsearch?(url)).to be false end it "should fail if 'X-elastic-product' header is present but with bad value" do - stub_request(:get, "localhost:9200") - .to_return(status: 200, - headers: {'X-elastic-product' => 'not good'}, - body: '{"version": {"number": "7.14.0"}}') + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(MockResponse.new(200, {"version": {"number": "7.14.0"}}, {'X-elastic-product' => 'not good'})) expect(subject.elasticsearch?(url)).to be false end - it "must be successful when 'X-elastic-product' header is present with expected value" do - stub_request(:get, "localhost:9200") - .to_return(status: 200, - headers: {'X-elastic-product': 'Elasticsearch'}, - body: '{"version": {"number": "7.14.0"}}') + it "must be successful when 'X-elastic-product' header is present with 'Elasticsearch' value" do + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(MockResponse.new(200, {"version": {"number": "7.14.0"}}, {'X-elastic-product' => 'Elasticsearch'})) expect(subject.elasticsearch?(url)).to be true end end From d483a4a425dff8cb398dfa01e7f6e0c853bdc77d Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 30 Jul 2021 15:32:44 +0200 Subject: [PATCH 06/17] Fixed bad spelled property --- .../outputs/elasticsearch/http_client/pool.rb | 4 ++-- .../elasticsearch/http_client/pool_spec.rb | 18 +++++++++--------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb index e3bccf05..6642b8d7 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb @@ -281,8 +281,8 @@ def elasticsearch?(url) if version >= Gem::Version.new('6.0.0') && version < Gem::Version.new('7.0.0') return valid_tagline?(version_info) elsif version >= Gem::Version.new('7.0.0') && version < Gem::Version.new('7.14.0') - build_flavour = version_info["version"]['build_flavour'] - return false if build_flavour.nil? || build_flavour != 'default' || !valid_tagline?(version_info) + build_flavor = version_info["version"]['build_flavor'] + return false if build_flavor.nil? || build_flavor != 'default' || !valid_tagline?(version_info) else # case >= 7.14 lower_headers = response.headers.transform_keys {|key| key.to_s.downcase } diff --git a/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb b/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb index e3aa8f42..7a12696b 100644 --- a/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb +++ b/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb @@ -192,10 +192,10 @@ def body context "with multiple URLs in the list" do let(:version_ok) do MockResponse.new(200, {"tagline" => "You Know, for Search", - "version" => { - "number" => '7.13.0', - "build_flavour" => 'default'} - }) + "version" => { + "number" => '7.13.0', + "build_flavor" => 'default'} + }) end before :each do @@ -417,21 +417,21 @@ def body end context "when connecting to a cluster with version in [7.0.0..7.14.0)" do - it "must be successful is 'build_flavour' is 'default' and tagline is correct" do + it "must be successful is 'build_flavor' is 'default' and tagline is correct" do allow(adapter).to receive(:perform_request) .with(anything, :get, "/", anything, anything) - .and_return(MockResponse.new(200, {"version": {"number": "7.5.0", "build_flavour": "default"}, "tagline": "You Know, for Search"})) + .and_return(MockResponse.new(200, {"version": {"number": "7.5.0", "build_flavor": "default"}, "tagline": "You Know, for Search"})) expect(subject.elasticsearch?(url)).to be true end - it "should fail if 'build_flavour' is not 'default' and tagline is correct" do + it "should fail if 'build_flavor' is not 'default' and tagline is correct" do allow(adapter).to receive(:perform_request) .with(anything, :get, "/", anything, anything) - .and_return(MockResponse.new(200, {"version": {"number": "7.5.0", "build_flavour": "oss"}, "tagline": "You Know, for Search"})) + .and_return(MockResponse.new(200, {"version": {"number": "7.5.0", "build_flavor": "oss"}, "tagline": "You Know, for Search"})) expect(subject.elasticsearch?(url)).to be false end - it "should fail if 'build_flavour' is not present and tagline is correct" do + it "should fail if 'build_flavor' is not present and tagline is correct" do allow(adapter).to receive(:perform_request) .with(anything, :get, "/", anything, anything) .and_return(MockResponse.new(200, {"version": {"number": "7.5.0"}, "tagline": "You Know, for Search"})) From 6ff1eedf47dc533adb6f08f07b63df880caacbbf Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 16 Aug 2021 14:04:29 +0200 Subject: [PATCH 07/17] Updated version --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f90f3d5..3ca12ff0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 11.2.0 + - Added preflight checks on Elasticsearch [#1026](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1026) + ## 11.1.0 - Feat: add `user-agent` header passed to the Elasticsearch HTTP connection [#1038](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1038) From 7255f0274f5e16ac27e4b7d0b9be92b28963b3d8 Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 16 Aug 2021 15:38:04 +0200 Subject: [PATCH 08/17] Switched #healthcheck! Elasticsearch check to raise a ConfigurationError --- .../outputs/elasticsearch/http_client/pool.rb | 33 ++++++++----------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb index 6642b8d7..0f0aa658 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb @@ -237,26 +237,19 @@ def healthcheck! @state_mutex.synchronize { @url_info.select {|url,meta| meta[:state] != :alive } }.each do |url,meta| begin health_check_request(url) - if elasticsearch?(url) - # If no exception was raised it must have succeeded! - logger.warn("Restored connection to ES instance", url: url.sanitized.to_s) - # We reconnected to this node, check its ES version - es_version = get_es_version(url) - @state_mutex.synchronize do - meta[:version] = es_version - set_last_es_version(es_version, url) - - alive = @license_checker.appropriate_license?(self, url) - meta[:state] = alive ? :alive : :dead - end - else - logger.info("URL doesn't point to Elasticsearch service", url: url.sanitized.to_s ) - es_version = get_es_version(url) - @state_mutex.synchronize do - meta[:version] = es_version - set_last_es_version(es_version, url) - meta[:state] = :dead - end + if !elasticsearch?(url) + raise LogStash::ConfigurationError, "Not a valid Elasticsearch" + end + # If no exception was raised it must have succeeded! + logger.warn("Restored connection to ES instance", url: url.sanitized.to_s) + # We reconnected to this node, check its ES version + es_version = get_es_version(url) + @state_mutex.synchronize do + meta[:version] = es_version + set_last_es_version(es_version, url) + + alive = @license_checker.appropriate_license?(self, url) + meta[:state] = alive ? :alive : :dead end rescue HostUnreachableError, BadResponseCodeError => e logger.warn("Attempted to resurrect connection to dead ES instance, but got an error", url: url.sanitized.to_s, exception: e.class, message: e.message) From 61de6e24f52aeec6b622c6037ddf66c1a25f22cb Mon Sep 17 00:00:00 2001 From: andsel Date: Tue, 17 Aug 2021 13:12:06 +0200 Subject: [PATCH 09/17] Fixed failing tests --- .../outputs/elasticsearch/http_client/pool_spec.rb | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb b/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb index 7a12696b..4c08e002 100644 --- a/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb +++ b/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb @@ -61,7 +61,7 @@ expect(adapter).to receive(:perform_request).with(anything, :head, "/", anything, anything) do |url, _, _, _, _| expect(url.path).to be_empty end - subject.healthcheck! + expect { subject.healthcheck! }.to raise_error(LogStash::ConfigurationError, "Not a valid Elasticsearch") end end @@ -72,7 +72,7 @@ expect(adapter).to receive(:perform_request).with(anything, :head, eq(healthcheck_path), anything, anything) do |url, _, _, _, _| expect(url.path).to be_empty end - subject.healthcheck! + expect { subject.healthcheck! }.to raise_error(LogStash::ConfigurationError, "Not a valid Elasticsearch") end end end @@ -244,8 +244,14 @@ def body ::LogStash::Util::SafeURI.new("http://otherhost:9201") ] } + let(:valid_response) { MockResponse.new(200, {"tagline" => "You Know, for Search", + "version" => { + "number" => '7.13.0', + "build_flavor" => 'default'} + }) } + before(:each) do - allow(subject).to receive(:perform_request_to_url).and_return(nil) + allow(subject).to receive(:perform_request_to_url).and_return(valid_response) subject.start end From e2d82e65aa0e2654978f604a460e97053077df6b Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 20 Aug 2021 09:14:01 +0200 Subject: [PATCH 10/17] Apply product check only during registration phase --- .ci/Dockerfile.elasticsearch | 2 ++ .../outputs/elasticsearch/http_client/pool.rb | 12 ++++++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/.ci/Dockerfile.elasticsearch b/.ci/Dockerfile.elasticsearch index ba27fb07..3b7f2f6d 100644 --- a/.ci/Dockerfile.elasticsearch +++ b/.ci/Dockerfile.elasticsearch @@ -11,6 +11,8 @@ RUN rm -f $es_path/config/scripts COPY --chown=elasticsearch:elasticsearch spec/fixtures/test_certs/* $es_path/config/test_certs/ COPY --chown=elasticsearch:elasticsearch .ci/elasticsearch-run.sh $es_path/ +RUN echo "xpack.security.enabled: false" >> $es_yml + RUN if [ "$SECURE_INTEGRATION" = "true" ] ; then echo "xpack.security.http.ssl.enabled: $SECURE_INTEGRATION" >> $es_yml; fi RUN if [ "$SECURE_INTEGRATION" = "true" ] ; then echo "xpack.security.http.ssl.key: $es_path/config/test_certs/test.key" >> $es_yml; fi RUN if [ "$SECURE_INTEGRATION" = "true" ] ; then echo "xpack.security.http.ssl.certificate: $es_path/config/test_certs/test.crt" >> $es_yml; fi diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb index 0f0aa658..35baae0a 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb @@ -211,7 +211,7 @@ def sniffer_alive? def start_resurrectionist @resurrectionist = Thread.new do until_stopped("resurrection", @resurrect_delay) do - healthcheck! + healthcheck!(false) end end end @@ -232,13 +232,17 @@ def health_check_request(url) perform_request_to_url(url, :head, @healthcheck_path) end - def healthcheck! + def healthcheck!(register_phase = true) # Try to keep locking granularity low such that we don't affect IO... @state_mutex.synchronize { @url_info.select {|url,meta| meta[:state] != :alive } }.each do |url,meta| begin health_check_request(url) - if !elasticsearch?(url) - raise LogStash::ConfigurationError, "Not a valid Elasticsearch" + + # when called from resurrectionist skip the product check done during register phase + if register_phase + if !elasticsearch?(url) + raise LogStash::ConfigurationError, "Not a valid Elasticsearch" + end end # If no exception was raised it must have succeeded! logger.warn("Restored connection to ES instance", url: url.sanitized.to_s) From 35902e00bd7a56fb0513178286c1b736b6b6de60 Mon Sep 17 00:00:00 2001 From: andsel Date: Thu, 23 Sep 2021 09:49:28 +0200 Subject: [PATCH 11/17] Reused root URI string and avoided creation of Gem::Verson objects --- lib/logstash/outputs/elasticsearch/http_client/pool.rb | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb index 35baae0a..9d6546fd 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb @@ -37,6 +37,9 @@ def message ROOT_URI_PATH = '/'.freeze LICENSE_PATH = '/_license'.freeze + VERSION_6_TO_7 = Gem::Requirement.new([">= 6.0.0", "< 7.0.0"]) + VERSION_7_TO_7_14 = Gem::Requirement.new([">= 7.0.0", "< 7.14.0"]) + DEFAULT_OPTIONS = { :healthcheck_path => ROOT_URI_PATH, :sniffing_path => "/_nodes/http", @@ -263,7 +266,7 @@ def healthcheck!(register_phase = true) def elasticsearch?(url) begin - response = perform_request_to_url(url, :get, "/") + response = perform_request_to_url(url, :get, ROOT_URI_PATH) rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e return false if response.code == 401 || response.code == 403 raise e @@ -275,9 +278,9 @@ def elasticsearch?(url) version = Gem::Version.new(version_info["version"]['number']) return false if version < Gem::Version.new('6.0.0') - if version >= Gem::Version.new('6.0.0') && version < Gem::Version.new('7.0.0') + if VERSION_6_TO_7.satisfied_by?(version) return valid_tagline?(version_info) - elsif version >= Gem::Version.new('7.0.0') && version < Gem::Version.new('7.14.0') + elsif VERSION_7_TO_7_14.satisfied_by?(version) build_flavor = version_info["version"]['build_flavor'] return false if build_flavor.nil? || build_flavor != 'default' || !valid_tagline?(version_info) else From 710577f0ef7b18e51145222bc04179cbb498a860 Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Wed, 29 Sep 2021 12:36:23 +0200 Subject: [PATCH 12/17] Simplify check MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: João Duarte --- lib/logstash/outputs/elasticsearch/http_client/pool.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb index 9d6546fd..522b89e1 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb @@ -297,8 +297,7 @@ def elasticsearch?(url) def valid_tagline?(version_info) tagline = version_info['tagline'] - return false if tagline.nil? || tagline != "You Know, for Search" - true + tagline == "You Know, for Search" end def stop_resurrectionist From 8b4126e7e1ac01f4d92e7727c4cd280cd779fdcc Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Wed, 29 Sep 2021 12:44:02 +0200 Subject: [PATCH 13/17] Simplify if condition MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: João Duarte --- lib/logstash/outputs/elasticsearch/http_client/pool.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb index 522b89e1..9ffebc61 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb @@ -287,7 +287,7 @@ def elasticsearch?(url) # case >= 7.14 lower_headers = response.headers.transform_keys {|key| key.to_s.downcase } product_header = lower_headers['x-elastic-product'] - return false if product_header.nil? || product_header != 'Elasticsearch' + return false if product_header != 'Elasticsearch' end return true rescue => e From 1c2f467c2115ac558b3179b5a373ca97c2d3634f Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 29 Sep 2021 17:40:13 +0200 Subject: [PATCH 14/17] Changed error message to something more useful --- lib/logstash/outputs/elasticsearch/http_client/pool.rb | 2 +- spec/unit/outputs/elasticsearch/http_client/pool_spec.rb | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb index 9ffebc61..50f75115 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb @@ -244,7 +244,7 @@ def healthcheck!(register_phase = true) # when called from resurrectionist skip the product check done during register phase if register_phase if !elasticsearch?(url) - raise LogStash::ConfigurationError, "Not a valid Elasticsearch" + raise LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch" end end # If no exception was raised it must have succeeded! diff --git a/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb b/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb index 4c08e002..edf3b341 100644 --- a/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb +++ b/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb @@ -61,7 +61,7 @@ expect(adapter).to receive(:perform_request).with(anything, :head, "/", anything, anything) do |url, _, _, _, _| expect(url.path).to be_empty end - expect { subject.healthcheck! }.to raise_error(LogStash::ConfigurationError, "Not a valid Elasticsearch") + expect { subject.healthcheck! }.to raise_error(LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch") end end @@ -72,7 +72,7 @@ expect(adapter).to receive(:perform_request).with(anything, :head, eq(healthcheck_path), anything, anything) do |url, _, _, _, _| expect(url.path).to be_empty end - expect { subject.healthcheck! }.to raise_error(LogStash::ConfigurationError, "Not a valid Elasticsearch") + expect { subject.healthcheck! }.to raise_error(LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch") end end end From e590e04f8e23666fd9d127e7810cdc062c7ef031 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 29 Sep 2021 17:44:59 +0200 Subject: [PATCH 15/17] Changed the version to 11.2.0 --- logstash-output-elasticsearch.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec index 29147f6e..6600c9ac 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-elasticsearch' - s.version = '11.1.0' + s.version = '11.2.0' s.licenses = ['apache-2.0'] s.summary = "Stores logs in Elasticsearch" From 83d644dcee183c45ec80721d5379c15fc4ff0e71 Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 1 Oct 2021 09:21:33 +0200 Subject: [PATCH 16/17] Removed disable of xpack.security --- .ci/Dockerfile.elasticsearch | 2 -- lib/logstash/outputs/elasticsearch/http_client/pool.rb | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/.ci/Dockerfile.elasticsearch b/.ci/Dockerfile.elasticsearch index 3b7f2f6d..ba27fb07 100644 --- a/.ci/Dockerfile.elasticsearch +++ b/.ci/Dockerfile.elasticsearch @@ -11,8 +11,6 @@ RUN rm -f $es_path/config/scripts COPY --chown=elasticsearch:elasticsearch spec/fixtures/test_certs/* $es_path/config/test_certs/ COPY --chown=elasticsearch:elasticsearch .ci/elasticsearch-run.sh $es_path/ -RUN echo "xpack.security.enabled: false" >> $es_yml - RUN if [ "$SECURE_INTEGRATION" = "true" ] ; then echo "xpack.security.http.ssl.enabled: $SECURE_INTEGRATION" >> $es_yml; fi RUN if [ "$SECURE_INTEGRATION" = "true" ] ; then echo "xpack.security.http.ssl.key: $es_path/config/test_certs/test.key" >> $es_yml; fi RUN if [ "$SECURE_INTEGRATION" = "true" ] ; then echo "xpack.security.http.ssl.certificate: $es_path/config/test_certs/test.crt" >> $es_yml; fi diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb index 50f75115..3a7b136a 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb @@ -297,7 +297,7 @@ def elasticsearch?(url) def valid_tagline?(version_info) tagline = version_info['tagline'] - tagline == "You Know, for Search" + tagline == "You Know, for Search" end def stop_resurrectionist From e8d3d7ddc721f7878d8b61f17c5b0f66a12bdc50 Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Mon, 11 Oct 2021 18:49:22 +0200 Subject: [PATCH 17/17] Fixed error message MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: João Duarte --- lib/logstash/outputs/elasticsearch/http_client/pool.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb index 3a7b136a..a293dfd0 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb @@ -291,7 +291,7 @@ def elasticsearch?(url) end return true rescue => e - logger.error("Unable to version version information", url: url.sanitized.to_s, exception: e.class, message: e.message) + logger.error("Unable to retrieve Elasticsearch version", url: url.sanitized.to_s, exception: e.class, message: e.message) false end