Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions logstash-core/lib/logstash/api/commands/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ def pipeline(pipeline_id, options = {})
).reject {|_, v| v.nil?}
if options.fetch(:graph, false)
extended_stats = extract_metrics([:stats, :pipelines, pipeline_id.to_sym, :config], :graph)
decorated_vertices = extended_stats[:graph]["graph"]["vertices"].map { |vertex| decorate_with_cluster_uuids(vertex) }
extended_stats[:graph]["graph"]["vertices"] = decorated_vertices
decorated_vertices = extended_stats[:graph][:graph][:vertices].map { |vertex| decorate_with_cluster_uuids(vertex) }

extended_stats[:graph][:graph][:vertices] = decorated_vertices
metrics.merge!(extended_stats)
end
metrics
Expand Down
20 changes: 16 additions & 4 deletions logstash-core/lib/logstash/api/commands/stats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,22 @@ def refine_batch_metrics(stats)
current_data_point = stats[:batch][:current]
# FlowMetric (from stats[:batch][:event_count][:average]) returns a composite object containing lifetime/last_1_minute/etc values. In order to get the map of sub-metrics we must use `.value`.
# See: https://github.com/elastic/logstash/blob/279171b79c1f3be5fc85e6e2e4092281e504a6f9/logstash-core/src/main/java/org/logstash/instrument/metrics/ExtendedFlowMetric.java#L89
event_count_average_flow_metric = stats[:batch][:event_count][:average].value
event_count_average_lifetime = event_count_average_flow_metric["lifetime"] ? event_count_average_flow_metric["lifetime"].round : 0
byte_size_average_flow_metric = stats[:batch][:byte_size][:average].value
byte_size_average_lifetime = byte_size_average_flow_metric["lifetime"] ? byte_size_average_flow_metric["lifetime"].round : 0
event_count_data_point = stats[:batch][:event_count]
if event_count_data_point
event_count_average_flow_metric = event_count_data_point[:average].value
event_count_average_lifetime = event_count_average_flow_metric["lifetime"] ? event_count_average_flow_metric["lifetime"].round : 0
else
event_count_average_flow_metric = {}
event_count_average_lifetime = 0
end
byte_size_data_point = stats[:batch][:byte_size]
if byte_size_data_point
byte_size_average_flow_metric = byte_size_data_point[:average].value
byte_size_average_lifetime = byte_size_average_flow_metric["lifetime"] ? byte_size_average_flow_metric["lifetime"].round : 0
else
byte_size_average_flow_metric = {}
byte_size_average_lifetime = 0
end
result = {
:event_count => {
# current_data_point is an instance of org.logstash.instrument.metrics.gauge.LazyDelegatingGauge so need to invoke getValue() to obtain the actual value
Expand Down
83 changes: 81 additions & 2 deletions logstash-core/lib/logstash/instrument/metric_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ module LogStash module Instrument
# saved in a retrievable way, this is a wrapper around multiples ConcurrentHashMap
# acting as a tree like structure.
class MetricStore
class NamespacesExpectedError < StandardError; end
class MetricNotFound < StandardError; end
# class NamespacesExpectedError < StandardError; end
# class MetricNotFound < StandardError; end
java_import org.logstash.instrument.metrics.MetricStore::MetricNotFound
java_import org.logstash.instrument.metrics.MetricStore::NamespacesExpectedError

KEY_PATH_SEPARATOR = "/".freeze

Expand All @@ -47,6 +49,9 @@ def initialize
# in the structured hash or when we query it for search or to make
# the result available in the API.
@structured_lookup_mutex = Mutex.new

@java_store = org.logstash.instrument.metrics.MetricStore.new
@use_java_impl = true
end

# This method use the namespace and key to search the corresponding value of
Expand All @@ -66,6 +71,15 @@ def initialize
# to the provided default_value_generator block will be stored.
# @return [Metric] the value as it exists in the tree after this operation
def fetch_or_store(namespaces, key, default_value = nil)
if @use_java_impl
if !block_given?
return @java_store.fetch_or_store(namespaces.map(&:to_s), key.to_s, default_value)
else
block_wrapper = java.util.function.Supplier.impl { || yield(key) }
return @java_store.fetch_or_store(namespaces.map(&:to_s), key.to_s, block_wrapper)
end
end

# We first check in the `@fast_lookup` store to see if we have already see that metrics before,
# This give us a `o(1)` access, which is faster than searching through the structured
# data store (Which is a `o(n)` operation where `n` is the number of element in the namespace and
Expand Down Expand Up @@ -104,6 +118,9 @@ def fetch_or_store(namespaces, key, default_value = nil)
# @param [Array] The path where values should be located
# @return [Hash]
def get_with_path(path)
return remap_keys_to_sym(@java_store.get_with_path(path)) if @use_java_impl
# return @java_store.get_with_path(java.util.ArrayList.new(path.map(&:to_s))) if @use_java_impl

get(*key_paths(path))
end

Expand All @@ -112,6 +129,9 @@ def get_with_path(path)
# @param [Array<Symbol>]
# @return [Hash]
def get(*key_paths)
#TODO ? remap key strings to key symbols or the Java get method should work with RubySymbol-s?
return remap_keys_to_sym(@java_store.get(java.util.ArrayList.new(key_paths.map(&:to_s)))) if @use_java_impl

# Normalize the symbols access
key_paths.map(&:to_sym)
new_hash = Hash.new
Expand All @@ -123,13 +143,45 @@ def get(*key_paths)
new_hash
end

# Deeply convert map where keys are strings to symbols
def remap_keys_to_sym(map)
return map unless map.is_a?(Hash)

translated_map = {}
map.each do |key, value|
if value.is_a?(Hash)
translated_map[key.to_sym] = remap_keys_to_sym(value)
else
translated_map[key.to_sym] = value
end
#map.delete(key)
end
translated_map
end
private :remap_keys_to_sym

def remap_nested_to_string(list)
result = java.util.ArrayList.new(list.size)
list.each do |v|
if v.is_a?(Array)
result << remap_nested_to_string(v)
else
result << v.to_s
end
end
result
end
private :remap_nested_to_string

# Retrieve values like `get`, but don't return them fully nested.
# This means that if you call `get_shallow(:foo, :bar)` the result will not
# be nested inside of `{:foo {:bar => values}`.
#
# @param [Array<Symbol>]
# @return [Hash]
def get_shallow(*key_paths)
return remap_keys_to_sym(@java_store.get_shallow(key_paths.map(&:to_s))) if @use_java_impl

key_paths.reduce(get(*key_paths)) {|acc, p| acc[p]}
end

Expand All @@ -154,6 +206,8 @@ def get_shallow(*key_paths)
# }
# }
def extract_metrics(path, *keys)
return remap_keys_to_sym(@java_store.extract_metrics(java.util.ArrayList.new(path.map(&:to_s)), remap_nested_to_string(keys))) if @use_java_impl

keys.reduce({}) do |acc, k|
# Simplify 1-length keys
k = k.first if k.is_a?(Array) && k.size == 1
Expand Down Expand Up @@ -187,6 +241,10 @@ def extract_metrics(path, *keys)
end

def has_metric?(*path)
if @use_java_impl
return @java_store.has_metric?(java.util.ArrayList.new(path.map(&:to_s)))
end

@fast_lookup[path]
end

Expand All @@ -196,6 +254,22 @@ def has_metric?(*path)
# @param path [String] The search path for metrics
# @param [Array] The metric for the specific path
def each(path = nil, &block)
if @use_java_impl
if path.nil?
if block_given?
return @java_store.each(&block)
else
return @java_store.each
end
else
if block_given?
return @java_store.each(path, &block)
else
return @java_store.each(path)
end
end
end

metrics = if path.nil?
get_all
else
Expand All @@ -207,6 +281,8 @@ def each(path = nil, &block)
alias_method :all, :each

def prune(path)
return @java_store.prune(path) if @use_java_impl

key_paths = key_paths(path).map(&:to_sym)
@structured_lookup_mutex.synchronize do
keys_to_delete = @fast_lookup.keys.select {|namespace| (key_paths - namespace[0..-2]).empty? }
Expand All @@ -216,11 +292,14 @@ def prune(path)
end

def size
return @java_store.size if @use_java_impl

@fast_lookup.size
end

private
def get_all
# used only when java impl is disabled, so no need to provide a call to the java store.
@fast_lookup.values
end

Expand Down
2 changes: 1 addition & 1 deletion logstash-core/spec/logstash/api/commands/node_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
allow(api_service).to receive(:extract_metrics).and_call_original
expect(api_service).to receive(:extract_metrics)
.with([:stats, :pipelines, :main, :config], any_args)
.and_raise(LogStash::Instrument::MetricStore::MetricNotFound)
.and_raise(org.logstash.instrument.metrics.MetricStore::MetricNotFound.new("Simulated MetricNotFound for testing"))
end

it 'does not contain the partially-constructed pipeline' do
Expand Down
10 changes: 8 additions & 2 deletions logstash-core/spec/logstash/instrument/metric_store_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

it "raise an exception" do
subject.fetch_or_store(namespaces, key, counter)
expect { subject.fetch_or_store(conflicting_namespaces, :new_key, counter) }.to raise_error(LogStash::Instrument::MetricStore::NamespacesExpectedError)
expect { subject.fetch_or_store(conflicting_namespaces, :new_key, counter) }.to raise_error(org.logstash.instrument.metrics.MetricStore::NamespacesExpectedError)
end
end

Expand Down Expand Up @@ -87,16 +87,19 @@
it "retrieves end of of a branch" do
metrics = subject.get(:node, :sashimi, :pipelines, :pipeline01, :plugins, :"logstash-output-elasticsearch")
expect(metrics).to match(a_hash_including(:node => a_hash_including(:sashimi => a_hash_including(:pipelines => a_hash_including(:pipeline01 => a_hash_including(:plugins => a_hash_including(:"logstash-output-elasticsearch" => anything)))))))
# expect(metrics).to match(a_hash_including("node" => a_hash_including("sashimi" => a_hash_including("pipelines" => a_hash_including("pipeline01" => a_hash_including("plugins" => a_hash_including("logstash-output-elasticsearch" => anything)))))))
end

it "retrieves branch" do
metrics = subject.get(:node, :sashimi, :pipelines, :pipeline01)
expect(metrics).to match(a_hash_including(:node => a_hash_including(:sashimi => a_hash_including(:pipelines => a_hash_including(:pipeline01 => anything)))))
# expect(metrics).to match(a_hash_including("node" => a_hash_including("sashimi" => a_hash_including("pipelines" => a_hash_including("pipeline01" => anything)))))
end

it "allow to retrieve a specific metrics" do
metrics = subject.get(:node, :sashimi, :pipelines, :pipeline01, :plugins, :"logstash-output-elasticsearch", :event_in)
expect(metrics).to match(a_hash_including(:node => a_hash_including(:sashimi => a_hash_including(:pipelines => a_hash_including(:pipeline01 => a_hash_including(:plugins => a_hash_including(:"logstash-output-elasticsearch" => a_hash_including(:event_in => be_kind_of(org.logstash.instrument.metrics.counter.LongCounter)))))))))
# expect(metrics).to match(a_hash_including("node" => a_hash_including("sashimi" => a_hash_including("pipelines" => a_hash_including("pipeline01" => a_hash_including("plugins" => a_hash_including("logstash-output-elasticsearch" => a_hash_including("event_in" => be_kind_of(org.logstash.instrument.metrics.counter.LongCounter)))))))))
end

context "with filtered keys" do
Expand All @@ -119,6 +122,7 @@
context "when the path doesnt exist" do
it "raise an exception" do
expect { subject.get(:node, :sashimi, :dontexist) }.to raise_error(LogStash::Instrument::MetricStore::MetricNotFound, /dontexist/)
# expect { subject.get(:node, :sashimi, :dontexist) }.to raise_error(org.logstash.instrument.metrics.MetricStore::MetricNotFound, /dontexist/)
end
end
end
Expand Down Expand Up @@ -167,6 +171,7 @@
context "when the path doesnt exist" do
it "raise an exception" do
expect { subject.get_with_path("node/sashimi/dontexist, pipeline02 /plugins/logstash-output-elasticsearch/event_in") }.to raise_error(LogStash::Instrument::MetricStore::MetricNotFound, /dontexist/)
# expect { subject.get_with_path("node/sashimi/dontexist, pipeline02 /plugins/logstash-output-elasticsearch/event_in") }.to raise_error(org.logstash.instrument.metrics.MetricStore::MetricNotFound, /dontexist/)
end
end
end
Expand Down Expand Up @@ -195,7 +200,7 @@
:processed_events_in,
[:plugins, :"logstash-output-elasticsearch", :event_in]
)
expect(r[:processed_events_in]).to eql(1)
expect(r[:processed_events_in]).to eql(1)
expect(r[:plugins][:"logstash-output-elasticsearch"][:event_in]).to eql(1)
end

Expand Down Expand Up @@ -279,6 +284,7 @@
expect(subject.get(:node, :sashimi, :pipelines, :pipeline01)).to be_a(Hash)
subject.prune("/node/sashimi/pipelines/pipeline01")
expect { subject.get(:node, :sashimi, :pipelines, :pipeline01) }.to raise_error LogStash::Instrument::MetricStore::MetricNotFound
# expect { subject.get(:node, :sashimi, :pipelines, :pipeline01) }.to raise_error org.logstash.instrument.metrics.MetricStore::MetricNotFound
end

it "should keep other metrics on different path branches" do
Expand Down
Loading
Loading