diff --git a/logstash-core/lib/logstash/api/commands/node.rb b/logstash-core/lib/logstash/api/commands/node.rb index db144478140..111b243c3f8 100644 --- a/logstash-core/lib/logstash/api/commands/node.rb +++ b/logstash-core/lib/logstash/api/commands/node.rb @@ -56,8 +56,9 @@ def pipeline(pipeline_id, options = {}) ).reject {|_, v| v.nil?} if options.fetch(:graph, false) extended_stats = extract_metrics([:stats, :pipelines, pipeline_id.to_sym, :config], :graph) - decorated_vertices = extended_stats[:graph]["graph"]["vertices"].map { |vertex| decorate_with_cluster_uuids(vertex) } - extended_stats[:graph]["graph"]["vertices"] = decorated_vertices + decorated_vertices = extended_stats[:graph][:graph][:vertices].map { |vertex| decorate_with_cluster_uuids(vertex) } + + extended_stats[:graph][:graph][:vertices] = decorated_vertices metrics.merge!(extended_stats) end metrics diff --git a/logstash-core/lib/logstash/api/commands/stats.rb b/logstash-core/lib/logstash/api/commands/stats.rb index 0d8d34e66cf..69cb9c957dc 100644 --- a/logstash-core/lib/logstash/api/commands/stats.rb +++ b/logstash-core/lib/logstash/api/commands/stats.rb @@ -178,10 +178,22 @@ def refine_batch_metrics(stats) current_data_point = stats[:batch][:current] # FlowMetric (from stats[:batch][:event_count][:average]) returns a composite object containing lifetime/last_1_minute/etc values. In order to get the map of sub-metrics we must use `.value`. # See: https://github.com/elastic/logstash/blob/279171b79c1f3be5fc85e6e2e4092281e504a6f9/logstash-core/src/main/java/org/logstash/instrument/metrics/ExtendedFlowMetric.java#L89 - event_count_average_flow_metric = stats[:batch][:event_count][:average].value - event_count_average_lifetime = event_count_average_flow_metric["lifetime"] ? event_count_average_flow_metric["lifetime"].round : 0 - byte_size_average_flow_metric = stats[:batch][:byte_size][:average].value - byte_size_average_lifetime = byte_size_average_flow_metric["lifetime"] ? byte_size_average_flow_metric["lifetime"].round : 0 + event_count_data_point = stats[:batch][:event_count] + if event_count_data_point + event_count_average_flow_metric = event_count_data_point[:average].value + event_count_average_lifetime = event_count_average_flow_metric["lifetime"] ? event_count_average_flow_metric["lifetime"].round : 0 + else + event_count_average_flow_metric = {} + event_count_average_lifetime = 0 + end + byte_size_data_point = stats[:batch][:byte_size] + if byte_size_data_point + byte_size_average_flow_metric = byte_size_data_point[:average].value + byte_size_average_lifetime = byte_size_average_flow_metric["lifetime"] ? byte_size_average_flow_metric["lifetime"].round : 0 + else + byte_size_average_flow_metric = {} + byte_size_average_lifetime = 0 + end result = { :event_count => { # current_data_point is an instance of org.logstash.instrument.metrics.gauge.LazyDelegatingGauge so need to invoke getValue() to obtain the actual value diff --git a/logstash-core/lib/logstash/instrument/metric_store.rb b/logstash-core/lib/logstash/instrument/metric_store.rb index 40a0080a38a..7c7458ee8c9 100644 --- a/logstash-core/lib/logstash/instrument/metric_store.rb +++ b/logstash-core/lib/logstash/instrument/metric_store.rb @@ -24,8 +24,10 @@ module LogStash module Instrument # saved in a retrievable way, this is a wrapper around multiples ConcurrentHashMap # acting as a tree like structure. class MetricStore - class NamespacesExpectedError < StandardError; end - class MetricNotFound < StandardError; end + # class NamespacesExpectedError < StandardError; end + # class MetricNotFound < StandardError; end + java_import org.logstash.instrument.metrics.MetricStore::MetricNotFound + java_import org.logstash.instrument.metrics.MetricStore::NamespacesExpectedError KEY_PATH_SEPARATOR = "/".freeze @@ -47,6 +49,9 @@ def initialize # in the structured hash or when we query it for search or to make # the result available in the API. @structured_lookup_mutex = Mutex.new + + @java_store = org.logstash.instrument.metrics.MetricStore.new + @use_java_impl = true end # This method use the namespace and key to search the corresponding value of @@ -66,6 +71,15 @@ def initialize # to the provided default_value_generator block will be stored. # @return [Metric] the value as it exists in the tree after this operation def fetch_or_store(namespaces, key, default_value = nil) + if @use_java_impl + if !block_given? + return @java_store.fetch_or_store(namespaces.map(&:to_s), key.to_s, default_value) + else + block_wrapper = java.util.function.Supplier.impl { || yield(key) } + return @java_store.fetch_or_store(namespaces.map(&:to_s), key.to_s, block_wrapper) + end + end + # We first check in the `@fast_lookup` store to see if we have already see that metrics before, # This give us a `o(1)` access, which is faster than searching through the structured # data store (Which is a `o(n)` operation where `n` is the number of element in the namespace and @@ -104,6 +118,9 @@ def fetch_or_store(namespaces, key, default_value = nil) # @param [Array] The path where values should be located # @return [Hash] def get_with_path(path) + return remap_keys_to_sym(@java_store.get_with_path(path)) if @use_java_impl + # return @java_store.get_with_path(java.util.ArrayList.new(path.map(&:to_s))) if @use_java_impl + get(*key_paths(path)) end @@ -112,6 +129,9 @@ def get_with_path(path) # @param [Array] # @return [Hash] def get(*key_paths) + #TODO ? remap key strings to key symbols or the Java get method should work with RubySymbol-s? + return remap_keys_to_sym(@java_store.get(java.util.ArrayList.new(key_paths.map(&:to_s)))) if @use_java_impl + # Normalize the symbols access key_paths.map(&:to_sym) new_hash = Hash.new @@ -123,6 +143,36 @@ def get(*key_paths) new_hash end + # Deeply convert map where keys are strings to symbols + def remap_keys_to_sym(map) + return map unless map.is_a?(Hash) + + translated_map = {} + map.each do |key, value| + if value.is_a?(Hash) + translated_map[key.to_sym] = remap_keys_to_sym(value) + else + translated_map[key.to_sym] = value + end + #map.delete(key) + end + translated_map + end + private :remap_keys_to_sym + + def remap_nested_to_string(list) + result = java.util.ArrayList.new(list.size) + list.each do |v| + if v.is_a?(Array) + result << remap_nested_to_string(v) + else + result << v.to_s + end + end + result + end + private :remap_nested_to_string + # Retrieve values like `get`, but don't return them fully nested. # This means that if you call `get_shallow(:foo, :bar)` the result will not # be nested inside of `{:foo {:bar => values}`. @@ -130,6 +180,8 @@ def get(*key_paths) # @param [Array] # @return [Hash] def get_shallow(*key_paths) + return remap_keys_to_sym(@java_store.get_shallow(key_paths.map(&:to_s))) if @use_java_impl + key_paths.reduce(get(*key_paths)) {|acc, p| acc[p]} end @@ -154,6 +206,8 @@ def get_shallow(*key_paths) # } # } def extract_metrics(path, *keys) + return remap_keys_to_sym(@java_store.extract_metrics(java.util.ArrayList.new(path.map(&:to_s)), remap_nested_to_string(keys))) if @use_java_impl + keys.reduce({}) do |acc, k| # Simplify 1-length keys k = k.first if k.is_a?(Array) && k.size == 1 @@ -187,6 +241,10 @@ def extract_metrics(path, *keys) end def has_metric?(*path) + if @use_java_impl + return @java_store.has_metric?(java.util.ArrayList.new(path.map(&:to_s))) + end + @fast_lookup[path] end @@ -196,6 +254,22 @@ def has_metric?(*path) # @param path [String] The search path for metrics # @param [Array] The metric for the specific path def each(path = nil, &block) + if @use_java_impl + if path.nil? + if block_given? + return @java_store.each(&block) + else + return @java_store.each + end + else + if block_given? + return @java_store.each(path, &block) + else + return @java_store.each(path) + end + end + end + metrics = if path.nil? get_all else @@ -207,6 +281,8 @@ def each(path = nil, &block) alias_method :all, :each def prune(path) + return @java_store.prune(path) if @use_java_impl + key_paths = key_paths(path).map(&:to_sym) @structured_lookup_mutex.synchronize do keys_to_delete = @fast_lookup.keys.select {|namespace| (key_paths - namespace[0..-2]).empty? } @@ -216,11 +292,14 @@ def prune(path) end def size + return @java_store.size if @use_java_impl + @fast_lookup.size end private def get_all + # used only when java impl is disabled, so no need to provide a call to the java store. @fast_lookup.values end diff --git a/logstash-core/spec/logstash/api/commands/node_spec.rb b/logstash-core/spec/logstash/api/commands/node_spec.rb index 1baa130c435..6f54d321ee4 100644 --- a/logstash-core/spec/logstash/api/commands/node_spec.rb +++ b/logstash-core/spec/logstash/api/commands/node_spec.rb @@ -75,7 +75,7 @@ allow(api_service).to receive(:extract_metrics).and_call_original expect(api_service).to receive(:extract_metrics) .with([:stats, :pipelines, :main, :config], any_args) - .and_raise(LogStash::Instrument::MetricStore::MetricNotFound) + .and_raise(org.logstash.instrument.metrics.MetricStore::MetricNotFound.new("Simulated MetricNotFound for testing")) end it 'does not contain the partially-constructed pipeline' do diff --git a/logstash-core/spec/logstash/instrument/metric_store_spec.rb b/logstash-core/spec/logstash/instrument/metric_store_spec.rb index c0f5d1b7a5b..2fc1850c4a1 100644 --- a/logstash-core/spec/logstash/instrument/metric_store_spec.rb +++ b/logstash-core/spec/logstash/instrument/metric_store_spec.rb @@ -46,7 +46,7 @@ it "raise an exception" do subject.fetch_or_store(namespaces, key, counter) - expect { subject.fetch_or_store(conflicting_namespaces, :new_key, counter) }.to raise_error(LogStash::Instrument::MetricStore::NamespacesExpectedError) + expect { subject.fetch_or_store(conflicting_namespaces, :new_key, counter) }.to raise_error(org.logstash.instrument.metrics.MetricStore::NamespacesExpectedError) end end @@ -87,16 +87,19 @@ it "retrieves end of of a branch" do metrics = subject.get(:node, :sashimi, :pipelines, :pipeline01, :plugins, :"logstash-output-elasticsearch") expect(metrics).to match(a_hash_including(:node => a_hash_including(:sashimi => a_hash_including(:pipelines => a_hash_including(:pipeline01 => a_hash_including(:plugins => a_hash_including(:"logstash-output-elasticsearch" => anything))))))) + # expect(metrics).to match(a_hash_including("node" => a_hash_including("sashimi" => a_hash_including("pipelines" => a_hash_including("pipeline01" => a_hash_including("plugins" => a_hash_including("logstash-output-elasticsearch" => anything))))))) end it "retrieves branch" do metrics = subject.get(:node, :sashimi, :pipelines, :pipeline01) expect(metrics).to match(a_hash_including(:node => a_hash_including(:sashimi => a_hash_including(:pipelines => a_hash_including(:pipeline01 => anything))))) + # expect(metrics).to match(a_hash_including("node" => a_hash_including("sashimi" => a_hash_including("pipelines" => a_hash_including("pipeline01" => anything))))) end it "allow to retrieve a specific metrics" do metrics = subject.get(:node, :sashimi, :pipelines, :pipeline01, :plugins, :"logstash-output-elasticsearch", :event_in) expect(metrics).to match(a_hash_including(:node => a_hash_including(:sashimi => a_hash_including(:pipelines => a_hash_including(:pipeline01 => a_hash_including(:plugins => a_hash_including(:"logstash-output-elasticsearch" => a_hash_including(:event_in => be_kind_of(org.logstash.instrument.metrics.counter.LongCounter))))))))) + # expect(metrics).to match(a_hash_including("node" => a_hash_including("sashimi" => a_hash_including("pipelines" => a_hash_including("pipeline01" => a_hash_including("plugins" => a_hash_including("logstash-output-elasticsearch" => a_hash_including("event_in" => be_kind_of(org.logstash.instrument.metrics.counter.LongCounter))))))))) end context "with filtered keys" do @@ -119,6 +122,7 @@ context "when the path doesnt exist" do it "raise an exception" do expect { subject.get(:node, :sashimi, :dontexist) }.to raise_error(LogStash::Instrument::MetricStore::MetricNotFound, /dontexist/) + # expect { subject.get(:node, :sashimi, :dontexist) }.to raise_error(org.logstash.instrument.metrics.MetricStore::MetricNotFound, /dontexist/) end end end @@ -167,6 +171,7 @@ context "when the path doesnt exist" do it "raise an exception" do expect { subject.get_with_path("node/sashimi/dontexist, pipeline02 /plugins/logstash-output-elasticsearch/event_in") }.to raise_error(LogStash::Instrument::MetricStore::MetricNotFound, /dontexist/) + # expect { subject.get_with_path("node/sashimi/dontexist, pipeline02 /plugins/logstash-output-elasticsearch/event_in") }.to raise_error(org.logstash.instrument.metrics.MetricStore::MetricNotFound, /dontexist/) end end end @@ -195,7 +200,7 @@ :processed_events_in, [:plugins, :"logstash-output-elasticsearch", :event_in] ) - expect(r[:processed_events_in]).to eql(1) + expect(r[:processed_events_in]).to eql(1) expect(r[:plugins][:"logstash-output-elasticsearch"][:event_in]).to eql(1) end @@ -279,6 +284,7 @@ expect(subject.get(:node, :sashimi, :pipelines, :pipeline01)).to be_a(Hash) subject.prune("/node/sashimi/pipelines/pipeline01") expect { subject.get(:node, :sashimi, :pipelines, :pipeline01) }.to raise_error LogStash::Instrument::MetricStore::MetricNotFound + # expect { subject.get(:node, :sashimi, :pipelines, :pipeline01) }.to raise_error org.logstash.instrument.metrics.MetricStore::MetricNotFound end it "should keep other metrics on different path branches" do diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricStore.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricStore.java new file mode 100644 index 00000000000..55a11b61b38 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricStore.java @@ -0,0 +1,431 @@ +package org.logstash.instrument.metrics; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jruby.anno.JRubyMethod; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public final class MetricStore { + + static final Logger LOGGER = LogManager.getLogger(MetricStore.class); + + public static class NamespacesExpectedError extends RuntimeException { + private static final long serialVersionUID = 4456147903096417842L; + + NamespacesExpectedError(String message) { + super(message); + } + } + + public static class MetricNotFound extends RuntimeException { + private static final long serialVersionUID = -6471818257968802560L; + + public MetricNotFound(String message) { + super(message); + } + } + + // We keep the structured cache to allow the api to search the content of the different nodes. + // Value could be a metric or another layer (ConcurrentMap) + private final ConcurrentMap store = new ConcurrentHashMap<>(); + + // This hash has only one dimension and allow fast retrieval of the metrics. + private final ConcurrentMap, Metric> fastLookup = new ConcurrentHashMap<>(); + + /** + * This Mutex block the critical section for the + * structured hash, it block the zone when we first insert a metric + * in the structured hash or when we query it for search or to make + * the result available in the API. + */ + private final ReentrantLock lock = new ReentrantLock(); + +// public Metric fetchOrStore(List namespacesPath, RubySymbol key, final Metric metric) { + public Metric fetchOrStore(List namespacesPath, String key, final Metric metric) { + return fetchOrStore(namespacesPath, key, () -> metric); + } + + /** + * This method use the namespace and key to search the corresponding value of + * the hash, if it doesn't exist it will create the appropriate namespaces + * path in the hash and return `new_value`. + * + * @param namespacesPath + * The path where the values should be located. + * @param key + * The metric key. + * @param metricGenerator + * The function to be invoked to generate the metric if doesn't existing on the kay at the namespacePath. + * @return Metric instance for the namespace path and key provided. + */ + public Metric fetchOrStore(List namespacesPath, String key, Supplier> metricGenerator) { + // We first check in the `fastLookup` store to see if we have already see that metrics before, + // This give us a `O(1)` access, which is faster than searching through the structured + // data store (Which is a `O(n)` operation where `n` is the number of element in the namespace and + // the value of the key). If the metric is already present in the `fastLookup`, then that value is sent + // back directly to the caller. + + List namespacePathConverted = namespacesPath /*convertToJavaPath(namespacesPath)*/; + + List fastLookupKey = new ArrayList<>(namespacePathConverted); +// fastLookupKey.add(key.asJavaString()); + fastLookupKey.add(key); + + Metric existingValue = fastLookup.get(fastLookupKey); + if (existingValue != null) { + return existingValue; + } + + // BUT. If the value was not present in the `fastLookup` we acquire the lock + // before modifying _either_ the fast-lookup or the structured store. + lock.lock(); + try { + // by using compute_if_absent, we ensure that we don't overwrite a value that was + // written by another thread that beat us to the lock. + return fastLookup.computeIfAbsent(fastLookupKey, k -> { + Metric generated = metricGenerator.get(); + fetchOrStoreNamespaces(namespacePathConverted).putIfAbsent(key/*.asJavaString()*/, generated); + return generated; + }); + } finally { + lock.unlock(); + } + } + +// private static List convertToJavaPath(List namespacesPath) { +// return namespacesPath.stream().map(rs -> rs.asJavaString()).collect(Collectors.toList()); +// } + + // Keys is a list of string, but can also contain maps and submaps + @SuppressWarnings("unchecked") + public Map extractMetrics(List path, List keys) { + Map acc = new HashMap<>(); + for (Object key : keys) { + // Simplify 1-length keys + if ((key instanceof List) && ((List) key).size() == 1) { + key = ((List) key).getFirst(); + } + + // If we have list values here we need to recurse. + // There are two levels of looping here, one for the paths we might pass in, + // one for the upcoming keys we might pass in. + // TODO alternatively check also RubyArray + if (key instanceof List) { + // We need to build up future executions to extractMetrics + // which means building up the path and keys arguments. + // We need a nested loop here to execute all permutations of these in case we hit + // something like [["a", "b"],["c", "d"]] which produces 4 different metrics + List castedKey = (List) key; + List nextPaths = wrapWithListIfScalar(castedKey.getFirst()); + List nextKeys = wrapWithListIfScalar(castedKey.get(1)); + List rest = castedKey.subList(2, castedKey.size()); + + for (Object nextPath : nextPaths) { + // If there already is a hash at this location use that so we don't overwrite it + Map npMap = (Map) acc.getOrDefault(nextPath, new HashMap<>()); + + // combine recursion key as path + nextPath + List nextPathRec = new ArrayList<>(path); + nextPathRec.add(nextPath.toString()); + + for (Object nextKey : nextKeys) { + // combine recursion key as nextKey + [rest] + List keysRec = new ArrayList<>(); + keysRec.add(nextKey); + keysRec.addAll(rest); + // wrap inside a List because ruby code use a splat operator that wrap all remaining params into an array (*keys) + Map recMap = extractMetrics(nextPathRec, Collections.singletonList(keysRec)); + + // merge recMap into npMap replacing the existing keys + for (Map.Entry entry : recMap.entrySet()) { + npMap.put(entry.getKey(), entry.getValue()); + } + } + acc.put(nextPath.toString(), npMap); + } + } else { + // scalar value, key is a string + String castedKey = (String) key; + + // copy the path because it's modified by getRecursively! + Object value = getShallow(new ArrayList<>(path)); + // we give for granted that value is a map + Object m = ((Map) value).get(castedKey); + acc.put(castedKey, m != null ? ((Metric) m).getValue() : null); + } + } + + return acc; + } + + @SuppressWarnings("unchecked") + private static List wrapWithListIfScalar(Object toWrap) { + if (toWrap instanceof List) { + return (List) toWrap; + } + return Collections.singletonList(toWrap); + } + + + /** + * Use the path to navigate the metrics tree and return what it matches, returns a Map or Metric. + * + * Retrieve values like `get`, but don't return them fully nested. + * This means that if you call `getShallow(["foo", "bar"])` the result will not + * be nested inside of `{"foo" {"bar" => values}`. + * */ + @SuppressWarnings("unchecked") + public Object getShallow(List path) { + // save a copy because get method modifies the instance, so it cleans up removing the head at each step. + ArrayList savedPath = new ArrayList<>(path); + Map acc = get(path); + for (String key : savedPath) { + Object next = acc.get(key); + if (!(next instanceof Map)) { + return next; + } + + acc = (Map) next; + } + return acc; + } + + @JRubyMethod(name = "has_metric?") + public boolean hasMetric(List path) { + return fastLookup.containsKey(path); + } + + public int size() { + return fastLookup.size(); + } + + public List> each() { + return getAll(); + } + + public List> each(Consumer> processor) { + List> result = getAll(); + result.forEach(processor); + return result; + } + + public List> each(String path) { + return transformToArray(getWithPath(path)); + } + + public List> each(String path, Consumer> processor) { + List> result = transformToArray(getWithPath(path)); + result.forEach(processor); + return result; + } + + @SuppressWarnings("unchecked") + public static List> transformToArray(Map map) { + List> result = new ArrayList<>(); + + for (Object value : map.values()) { + if (value instanceof Map) { + // Recursive call for nested map + result.addAll(transformToArray((Map) value)); + } else { + // TODO potential casting runtime error + result.add((Metric) value); + } + } + + return result; + } + + private List> getAll() { + return new ArrayList<>(fastLookup.values()); + } + + public void prune(String path) { + List keyPaths = keyPaths(path); + lock.lock(); + try { + List> keysToDelete = fastLookup.keySet() + .stream() + .filter(namespace -> keyMatch(namespace, keyPaths)) + .collect(Collectors.toList()); + keysToDelete.forEach(fastLookup::remove); + deleteFromMap(store, keyPaths); + } finally { + lock.unlock(); + } + } + + @SuppressWarnings("unchecked") + private void deleteFromMap(Map map, List keys) { + String key = keys.get(0); + + if (keys.size() == 1) { + // If it's the last key, remove the entry from the map + map.remove(key); + return; + } + + // Retrieve the value associated with the key + Object nestedObject = map.get(key); + if (nestedObject == null) { + return; + } + + // Check if the retrieved value is a nested map + if (nestedObject instanceof Map) { + // Safely cast the nested map and proceed recursively + deleteFromMap((Map) nestedObject, keys.subList(1, keys.size())); + } + } + + private static boolean keyMatch(List namespace, List keyPaths) { + return keyPaths.containsAll(namespace.subList(0, namespace.size() - 1)); + } + + /** + * This method allow to retrieve values for a specific path, + * This method support the following queries: + * + * stats/pipelines/pipeline_X + * stats/pipelines/pipeline_X,pipeline_2 + * stats/os,jvm + * + * If you use the `,` on a key the metric store will return the both values at that level + * + * The returned hash will keep the same structure as it had in the `ConcurrentMap` + * but will be a normal ruby hash. This will allow the api to easily serialize the content + * of the map. + * */ + public Map getWithPath(String path) { + return get(keyPaths(path)); + } + + public Map get(List keyPaths) { + lock.lock(); + try { + return getRecursively(keyPaths, store, new HashMap<>()); + } finally { + lock.unlock(); + } + } + + /** + * Split the string representing a path like /jvm/process into the tokens list [jvm, process] + */ + private List keyPaths(String path) { + // returned path has to be modifiable, so thw wrap into an ArrayList + return new ArrayList<>(Arrays.asList(path.replaceAll("^\\/+", "").split("/"))); + } + + /** + * This method take an array of keys and recursively search the metric store structure + * and return a filtered hash of the structure. This method also take into consideration + * getting two different branches. + * If one part of the `key_paths` contains a filter key with the following format + * "pipeline01, pipeline_02", It know that need to fetch the branch `pipeline01` and `pipeline02`. + * + * @param keyPaths + * The list of keys part to filter. + * @param map + * The part of map to search in. + * @param newHash + * The hash to populate with the results. + * @return the newHash. + */ + @SuppressWarnings("unchecked") + private Map getRecursively(List keyPaths, ConcurrentMap map, Map newHash) { + String[] keyCandidates = extractFilterKeys(keyPaths.getFirst()); + + // shift left the paths +// keyPaths = keyPaths.subList(1, keyPaths.size()); + keyPaths.remove(0); + + for (String keyCandidate : keyCandidates) { + if (!map.containsKey(keyCandidate)) { + throw new MetricNotFound(String.format("For path: %s. Map keys: %s", keyCandidate, map.keySet())); + } + + if (keyPaths.isEmpty()) { + // End of the user requested path, breaks the recursion + if (map.get(keyCandidate) instanceof ConcurrentMap) { + newHash.put(keyCandidate, transformToHash((ConcurrentMap) map.get(keyCandidate))); + } else { + newHash.put(keyCandidate, map.get(keyCandidate)); + } + } else { + if (map.get(keyCandidate) instanceof ConcurrentMap) { + newHash.put(keyCandidate, getRecursively(keyPaths, (ConcurrentMap) map.get(keyCandidate), new HashMap<>())); + } else { + newHash.put(keyCandidate, map.get(keyCandidate)); + } + } + } + return newHash; + } + + /** + * Transform the ConcurrentMap hash into a Map format, + * This is used to be serialize at the web api layer. + * */ + @SuppressWarnings( "unchecked") + private static Map transformToHash(Map map, Map newHash) { + map.forEach((key, value) -> { + if (value instanceof Map) { + // If the value is a nested map, initialize a new HashMap and recurse + Map nestedMap = new HashMap<>(); + newHash.put(key, nestedMap); + transformToHash((Map) value, nestedMap); + } else { + // Otherwise, directly copy the value to the new map + newHash.put(key, value); + } + }); + + return newHash; + } + + public static Map transformToHash(Map map) { + // Start by providing the initial HashMap as an empty map + return transformToHash(map, new HashMap<>()); + } + + private String[] extractFilterKeys(String key) { + return key.strip().split("\\s*,\\s*"); + } + + /** This method iterate through the namespace path and try to find the corresponding + * value for the path, if any part of the path is not found it will + * create it. + * + * @param namespacesPath + * The path where values should be located. + * @throws NamespacesExpectedError + * Throws if the retrieved object isn't a `Concurrent::Map`. + * @return + * Map where the metrics should be saved. The returned map could contain Metric or another layer of ConcurrentMap. + */ + @SuppressWarnings("unchecked") + private ConcurrentMap fetchOrStoreNamespaces(List namespacesPath) { + int index = 0; + ConcurrentMap node = store; + for (String namespace : namespacesPath) { + Object newNode = node.computeIfAbsent(namespace, k -> new ConcurrentHashMap<>()); + if (! (newNode instanceof ConcurrentMap)) { + final String error = String.format("Expecting a `Namespaces` but found class: %s for namespaces_path: #{namespaces_path.first(index + 1)}", + node.getClass().getName(), namespacesPath.subList(0, index + 1)); + throw new NamespacesExpectedError(error); + } + node = (ConcurrentMap) newNode; + + index++; + } + return node; + } +} diff --git a/logstash-core/src/test/java/org/logstash/instrument/metrics/MetricStoreTest.java b/logstash-core/src/test/java/org/logstash/instrument/metrics/MetricStoreTest.java new file mode 100644 index 00000000000..4b4c68ce66a --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/instrument/metrics/MetricStoreTest.java @@ -0,0 +1,29 @@ +package org.logstash.instrument.metrics; + +import org.junit.Test; +import org.logstash.instrument.metrics.counter.LongCounter; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.*; + +public class MetricStoreTest { + + @Test + public void testHasMetricWhenItContains() { + //setup + List path = Arrays.asList("node", "sashimi", "pipelines", "pipeline01", "plugins", "logstash-output-elasticsearch", "event_in"); + + String metricKey = "event_in"; + List namespaces = Arrays.asList("node", "sashimi", "pipelines", "pipeline01", "plugins", "logstash-output-elasticsearch"); + + MetricStore sut = new MetricStore(); + LongCounter metric = (LongCounter) sut.fetchOrStore(namespaces, metricKey, () -> new LongCounter(metricKey)); + metric.increment(); + + // Exercise + assertTrue(sut.hasMetric(path)); + } + +} \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/plugins/NamespacedMetricImplTest.java b/logstash-core/src/test/java/org/logstash/plugins/NamespacedMetricImplTest.java index 3f0890274fc..88f68df37fc 100644 --- a/logstash-core/src/test/java/org/logstash/plugins/NamespacedMetricImplTest.java +++ b/logstash-core/src/test/java/org/logstash/plugins/NamespacedMetricImplTest.java @@ -27,6 +27,7 @@ import org.jruby.RubyHash; import org.junit.Ignore; import org.junit.Test; +import org.logstash.instrument.metrics.MetricType; import java.util.Arrays; import java.util.Map; @@ -191,7 +192,7 @@ public void testRegister() { assertThat(rightCustomMetric.getValue()).contains("that=2", "another=2"); } - private interface CustomMetric extends UserMetric { + private interface CustomMetric extends UserMetric, org.logstash.instrument.metrics.Metric { void record(final String value); UserMetric.Provider PROVIDER = new UserMetric.Provider(CustomMetric.class, new CustomMetric() { @@ -200,6 +201,16 @@ public void record(String value) { // no-op } + @Override + public String getName() { + return "CustomMetric"; + } + + @Override + public MetricType getType() { + return MetricType.USER; + } + @Override public String getValue() { return ""; @@ -217,6 +228,16 @@ public void record(String value) { mapping.compute(value, (k, v) -> v == null ? 1 : v + 1); } + @Override + public String getName() { + return "CorrelatingCustomMetric"; + } + + @Override + public MetricType getType() { + return MetricType.USER; + } + @Override public String getValue() { return Map.copyOf(mapping).entrySet()