From 0af83e434c70367e74aef3b669a3d07bc428c247 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Fri, 9 Feb 2024 02:18:17 +0800 Subject: [PATCH] Support dimension sets in config (#238) * Dynamically config metric dimensions Signed-off-by: Louis Chu * Address comments from Peng and Chen Signed-off-by: Louis Chu --------- Signed-off-by: Louis Chu --- .../spark/metrics/sink/CloudWatchSink.java | 75 ++++++++- .../core/metrics/reporter/DimensionUtils.java | 95 ++++++++++++ .../DimensionedCloudWatchReporter.java | 143 +++++++++++------- ...SinkTests.java => CloudWatchSinkTest.java} | 43 +++++- .../metrics/reporter/DimensionUtilsTest.java | 69 +++++++++ .../DimensionedCloudWatchReporterTest.java | 71 +++++---- .../metrics/reporter/DimensionedNameTest.java | 2 +- 7 files changed, 411 insertions(+), 87 deletions(-) create mode 100644 flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionUtils.java rename flint-core/src/test/java/apache/spark/metrics/sink/{CloudWatchSinkTests.java => CloudWatchSinkTest.java} (62%) create mode 100644 flint-core/src/test/java/org/opensearch/flint/core/metrics/reporter/DimensionUtilsTest.java rename flint-core/src/test/java/{ => org}/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java (92%) rename flint-core/src/test/java/{ => org}/opensearch/flint/core/metrics/reporter/DimensionedNameTest.java (97%) diff --git a/flint-core/src/main/java/org/apache/spark/metrics/sink/CloudWatchSink.java b/flint-core/src/main/java/org/apache/spark/metrics/sink/CloudWatchSink.java index b69f0e4d0..cbeab0a62 100644 --- a/flint-core/src/main/java/org/apache/spark/metrics/sink/CloudWatchSink.java +++ b/flint-core/src/main/java/org/apache/spark/metrics/sink/CloudWatchSink.java @@ -18,6 +18,11 @@ import com.codahale.metrics.MetricFilter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.ScheduledReporter; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -26,6 +31,8 @@ import org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter; import org.opensearch.flint.core.metrics.reporter.DimensionedName; import org.opensearch.flint.core.metrics.reporter.InvalidMetricsPropertyException; +import com.fasterxml.jackson.databind.ObjectMapper; + /** * Implementation of the Spark metrics {@link Sink} interface @@ -38,6 +45,7 @@ * @author kmccaw */ public class CloudWatchSink implements Sink { + private static final ObjectMapper objectMapper = new ObjectMapper(); private final ScheduledReporter reporter; @@ -198,12 +206,26 @@ public CloudWatchSink( metricFilter = MetricFilter.ALL; } + final Optional dimensionGroupsProperty = getProperty(properties, PropertyKeys.DIMENSION_GROUPS); + DimensionNameGroups dimensionNameGroups = null; + if (dimensionGroupsProperty.isPresent()) { + try { + dimensionNameGroups = objectMapper.readValue(dimensionGroupsProperty.get(), DimensionNameGroups.class); + } catch (IOException e) { + final String message = String.format( + "Unable to parse value (%s) for the \"%s\" CloudWatchSink metrics property.", + dimensionGroupsProperty.get(), + PropertyKeys.DIMENSION_GROUPS); + throw new InvalidMetricsPropertyException(message, e); + } + } + final AmazonCloudWatchAsync cloudWatchClient = AmazonCloudWatchAsyncClient.asyncBuilder() .withCredentials(awsCredentialsProvider) .withRegion(awsRegion) .build(); - this.reporter = DimensionedCloudWatchReporter.forRegistry(metricRegistry, cloudWatchClient, namespaceProperty.get()) + DimensionedCloudWatchReporter.Builder builder = DimensionedCloudWatchReporter.forRegistry(metricRegistry, cloudWatchClient, namespaceProperty.get()) .convertRatesTo(TimeUnit.SECONDS) .convertDurationsTo(TimeUnit.MILLISECONDS) .filter(metricFilter) @@ -220,8 +242,13 @@ public CloudWatchSink( .withStatisticSet() .withGlobalDimensions() .withShouldParseDimensionsFromName(shouldParseInlineDimensions) - .withShouldAppendDropwizardTypeDimension(shouldAppendDropwizardTypeDimension) - .build(); + .withShouldAppendDropwizardTypeDimension(shouldAppendDropwizardTypeDimension); + + if (dimensionNameGroups != null && dimensionNameGroups.getDimensionGroups() != null) { + builder = builder.withDimensionNameGroups(dimensionNameGroups); + } + + this.reporter = builder.withDimensionNameGroups(dimensionNameGroups).build(); } @Override @@ -262,6 +289,7 @@ private static class PropertyKeys { static final String SHOULD_PARSE_INLINE_DIMENSIONS = "shouldParseInlineDimensions"; static final String SHOULD_APPEND_DROPWIZARD_TYPE_DIMENSION = "shouldAppendDropwizardTypeDimension"; static final String METRIC_FILTER_REGEX = "regex"; + static final String DIMENSION_GROUPS = "dimensionGroups"; } /** @@ -272,4 +300,45 @@ private static class PropertyDefaults { static final TimeUnit POLLING_PERIOD_TIME_UNIT = TimeUnit.MINUTES; static final boolean SHOULD_PARSE_INLINE_DIMENSIONS = false; } + + /** + * Represents a container for grouping dimension names used in metrics reporting. + * This class allows for the organization and storage of dimension names into logical groups, + * facilitating the dynamic construction and retrieval of dimension information for metrics. + */ + public static class DimensionNameGroups { + // Holds the grouping of dimension names categorized under different keys. + private Map>> dimensionGroups = new HashMap<>(); + + /** + * Sets the mapping of dimension groups. Each key in the map represents a category or a type + * of dimension, and the value is a list of dimension name groups, where each group is itself + * a list of dimension names that are logically grouped together. + * + * @param dimensionGroups A map of dimension groups categorized by keys, where each key maps + * to a list of dimension name groups. + */ + public void setDimensionGroups(Map>> dimensionGroups) { + if (dimensionGroups == null) { + final String message = String.format( + "Undefined value for the \"%s\" CloudWatchSink metrics property.", + PropertyKeys.DIMENSION_GROUPS); + throw new InvalidMetricsPropertyException(message); + } + this.dimensionGroups = dimensionGroups; + } + + /** + * Retrieves the current mapping of dimension groups. The structure of the returned map is such + * that each key represents a specific category or type of dimension, and the corresponding value + * is a list of dimension name groups. Each group is a list of dimension names that are logically + * grouped together. + * + * @return A map representing the groups of dimension names categorized by keys. Each key maps + * to a list of lists, where each inner list is a group of related dimension names. + */ + public Map>> getDimensionGroups() { + return dimensionGroups; + } + } } diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionUtils.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionUtils.java new file mode 100644 index 000000000..ce7136507 --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionUtils.java @@ -0,0 +1,95 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metrics.reporter; + +import java.util.Map; +import java.util.function.Function; +import org.apache.commons.lang.StringUtils; + +import com.amazonaws.services.cloudwatch.model.Dimension; + +/** + * Utility class for creating and managing CloudWatch dimensions for metrics reporting in Flint. + * It facilitates the construction of dimensions based on different system properties and environment + * variables, supporting the dynamic tagging of metrics with relevant information like job ID, + * application ID, and more. + */ +public class DimensionUtils { + private static final String DIMENSION_JOB_ID = "jobId"; + private static final String DIMENSION_APPLICATION_ID = "applicationId"; + private static final String DIMENSION_APPLICATION_NAME = "applicationName"; + private static final String DIMENSION_DOMAIN_ID = "domainId"; + private static final String DIMENSION_INSTANCE_ROLE = "instanceRole"; + private static final String UNKNOWN = "UNKNOWN"; + + // Maps dimension names to functions that generate Dimension objects based on specific logic or environment variables + private static final Map> dimensionBuilders = Map.of( + DIMENSION_INSTANCE_ROLE, DimensionUtils::getInstanceRoleDimension, + DIMENSION_JOB_ID, ignored -> getEnvironmentVariableDimension("SERVERLESS_EMR_JOB_ID", DIMENSION_JOB_ID), + DIMENSION_APPLICATION_ID, ignored -> getEnvironmentVariableDimension("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", DIMENSION_APPLICATION_ID), + DIMENSION_APPLICATION_NAME, ignored -> getEnvironmentVariableDimension("SERVERLESS_EMR_APPLICATION_NAME", DIMENSION_APPLICATION_NAME), + DIMENSION_DOMAIN_ID, ignored -> getEnvironmentVariableDimension("FLINT_CLUSTER_NAME", DIMENSION_DOMAIN_ID) + ); + + /** + * Constructs a CloudWatch Dimension object based on the provided dimension name. If a specific + * builder exists for the dimension name, it is used; otherwise, a default dimension is constructed. + * + * @param dimensionName The name of the dimension to construct. + * @param parts Additional information that might be required by specific dimension builders. + * @return A CloudWatch Dimension object. + */ + public static Dimension constructDimension(String dimensionName, String[] metricNameParts) { + if (!doesNameConsistsOfMetricNameSpace(metricNameParts)) { + throw new IllegalArgumentException("The provided metric name parts do not consist of a valid metric namespace."); + } + return dimensionBuilders.getOrDefault(dimensionName, ignored -> getDefaultDimension(dimensionName)) + .apply(metricNameParts); + } + + // This tries to replicate the logic here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L137 + // Since we don't have access to Spark Configuration here: we are relying on the presence of executorId as part of the metricName. + public static boolean doesNameConsistsOfMetricNameSpace(String[] metricNameParts) { + return metricNameParts.length >= 3 + && (metricNameParts[1].equals("driver") || StringUtils.isNumeric(metricNameParts[1])); + } + + /** + * Generates a Dimension object representing the instance role (either executor or driver) based on the + * metric name parts provided. + * + * @param parts An array where the second element indicates the role by being numeric (executor) or not (driver). + * @return A Dimension object with the instance role. + */ + private static Dimension getInstanceRoleDimension(String[] parts) { + String value = StringUtils.isNumeric(parts[1]) ? "executor" : parts[1]; + return new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue(value); + } + + /** + * Constructs a Dimension object using a system environment variable. If the environment variable is not found, + * it uses a predefined "UNKNOWN" value. + * + * @param envVarName The name of the environment variable to use for the dimension's value. + * @param dimensionName The name of the dimension. + * @return A Dimension object populated with the appropriate name and value. + */ + private static Dimension getEnvironmentVariableDimension(String envVarName, String dimensionName) { + String value = System.getenv().getOrDefault(envVarName, UNKNOWN); + return new Dimension().withName(dimensionName).withValue(value); + } + + /** + * Provides a generic mechanism to construct a Dimension object with an environment variable value + * or a default value if the environment variable is not set. + * + * @param dimensionName The name of the dimension for which to retrieve the value. + * @return A Dimension object populated with the dimension name and its corresponding value. + */ + private static Dimension getDefaultDimension(String dimensionName) { + return getEnvironmentVariableDimension(dimensionName, dimensionName); + } +} \ No newline at end of file diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java index a47fa70ce..e16eb0021 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java @@ -35,7 +35,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Date; -import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -48,9 +47,12 @@ import java.util.stream.LongStream; import java.util.stream.Stream; import org.apache.commons.lang.StringUtils; +import org.apache.spark.metrics.sink.CloudWatchSink.DimensionNameGroups; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.opensearch.flint.core.metrics.reporter.DimensionUtils.constructDimension; + /** * Reports metrics to Amazon's CloudWatch periodically. *

@@ -84,16 +86,6 @@ public class DimensionedCloudWatchReporter extends ScheduledReporter { // Visible for testing public static final String DIMENSION_SNAPSHOT_STD_DEV = "snapshot-std-dev"; - public static final String DIMENSION_JOB_ID = "jobId"; - - public static final String DIMENSION_APPLICATION_ID = "applicationId"; - - public static final String DIMENSION_DOMAIN_ID = "domainId"; - - public static final String DIMENSION_INSTANCE_ROLE = "instanceRole"; - - public static final String UNKNOWN = "unknown"; - /** * Amazon CloudWatch rejects values that are either too small or too large. * Values must be in the range of 8.515920e-109 to 1.174271e+108 (Base 10) or 2e-360 to 2e360 (Base 2). @@ -103,6 +95,8 @@ public class DimensionedCloudWatchReporter extends ScheduledReporter { private static final double SMALLEST_SENDABLE_VALUE = 8.515920e-109; private static final double LARGEST_SENDABLE_VALUE = 1.174271e+108; + private static Map constructedDimensions; + /** * Each CloudWatch API request may contain at maximum 20 datums */ @@ -133,6 +127,7 @@ private DimensionedCloudWatchReporter(final Builder builder) { this.durationUnit = builder.cwDurationUnit; this.shouldParseDimensionsFromName = builder.withShouldParseDimensionsFromName; this.shouldAppendDropwizardTypeDimension = builder.withShouldAppendDropwizardTypeDimension; + this.constructedDimensions = new ConcurrentHashMap<>(); this.filter = MetricFilter.ALL; } @@ -349,34 +344,89 @@ private void stageMetricDatum(final boolean metricConfigured, // Only submit metrics that show some data, so let's save some money if (metricConfigured && (builder.withZeroValuesSubmission || metricValue > 0)) { final DimensionedName dimensionedName = DimensionedName.decode(metricName); + // Add global dimensions for all metrics final Set dimensions = new LinkedHashSet<>(builder.globalDimensions); - MetricInfo metricInfo = getMetricInfo(dimensionedName); - dimensions.addAll(metricInfo.getDimensions()); if (shouldAppendDropwizardTypeDimension) { dimensions.add(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(dimensionValue)); } - metricData.add(new MetricDatum() - .withTimestamp(new Date(builder.clock.getTime())) - .withValue(cleanMetricValue(metricValue)) - .withMetricName(metricInfo.getMetricName()) - .withDimensions(dimensions) - .withUnit(standardUnit)); + MetricInfo metricInfo = getMetricInfo(dimensionedName, dimensions); + for (Set dimensionSet : metricInfo.getDimensionSets()) { + MetricDatum datum = new MetricDatum() + .withTimestamp(new Date(builder.clock.getTime())) + .withValue(cleanMetricValue(metricValue)) + .withMetricName(metricInfo.getMetricName()) + .withDimensions(dimensionSet) + .withUnit(standardUnit); + metricData.add(datum); + } } } - public MetricInfo getMetricInfo(DimensionedName dimensionedName) { + /** + * Constructs a {@link MetricInfo} object based on the provided {@link DimensionedName} and a set of additional dimensions. + * This method processes the metric name contained within {@code dimensionedName} to potentially modify it based on naming conventions + * and extracts or generates additional dimension sets for detailed metrics reporting. + *

+ * If no specific naming convention is detected, the original set of dimensions is used as is. The method finally encapsulates the metric name + * and the collection of dimension sets in a {@link MetricInfo} object and returns it. + * + * @param dimensionedName An instance of {@link DimensionedName} containing the original metric name and any directly associated dimensions. + * @param dimensions A set of {@link Dimension} objects provided externally that should be associated with the metric. + * @return A {@link MetricInfo} object containing the processed metric name and a list of dimension sets for metrics reporting. + */ + private MetricInfo getMetricInfo(DimensionedName dimensionedName, Set dimensions) { + // Add dimensions from dimensionedName + dimensions.addAll(dimensionedName.getDimensions()); + String metricName = dimensionedName.getName(); String[] parts = metricName.split("\\."); - Set dimensions = new HashSet<>(); - if (doesNameConsistsOfMetricNameSpace(parts)) { + List> dimensionSets = new ArrayList<>(); + if (DimensionUtils.doesNameConsistsOfMetricNameSpace(parts)) { metricName = constructMetricName(parts); - addInstanceRoleDimension(dimensions, parts); + // Get dimension sets corresponding to a specific metric source + constructDimensionSets(dimensionSets, parts); + // Add dimensions constructed above into each of the dimensionSets + for (Set dimensionSet : dimensionSets) { + // Create a copy of each set and add the additional dimensions + dimensionSet.addAll(dimensions); + } + } + + if (dimensionSets.isEmpty()) { + dimensionSets.add(dimensions); + } + return new MetricInfo(metricName, dimensionSets); + } + + /** + * Populates a list of dimension sets based on the metric source name extracted from the metric's parts + * and predefined dimension groupings. This method aims to create detailed and structured dimension + * sets for metrics, enhancing the granularity and relevance of metric reporting. + * + * If no predefined dimension groups exist for the metric source, or if the dimension name groups are + * not initialized, the method exits without modifying the dimension sets list. + * + * @param dimensionSets A list to be populated with sets of {@link Dimension} objects, each representing + * a group of dimensions relevant to the metric's source. + * @param parts An array of strings derived from splitting the metric's name, used to extract information + * like the metric source name and to construct dimensions based on naming conventions. + */ + private void constructDimensionSets(List> dimensionSets, String[] parts) { + String metricSourceName = parts[2]; + if (builder.dimensionNameGroups == null || builder.dimensionNameGroups.getDimensionGroups() == null || !builder.dimensionNameGroups.getDimensionGroups().containsKey(metricSourceName)) { + return; + } + + for (List dimensionNames: builder.dimensionNameGroups.getDimensionGroups().get(metricSourceName)) { + Set dimensions = new LinkedHashSet<>(); + for (String dimensionName: dimensionNames) { + constructedDimensions.putIfAbsent(dimensionName, constructDimension(dimensionName, parts)); + dimensions.add(constructedDimensions.get(dimensionName)); + } + dimensionSets.add(dimensions); } - addDefaultDimensionsForSparkJobMetrics(dimensions); - dimensions.addAll(dimensionedName.getDimensions()); - return new MetricInfo(metricName, dimensions); } /** @@ -393,31 +443,6 @@ private String constructMetricName(String[] metricNameParts) { return Stream.of(metricNameParts).skip(partsToSkip).collect(Collectors.joining(".")); } - // These dimensions are for all metrics - // TODO: Remove EMR-S specific env vars https://github.com/opensearch-project/opensearch-spark/issues/231 - private static void addDefaultDimensionsForSparkJobMetrics(Set dimensions) { - final String jobId = System.getenv().getOrDefault("SERVERLESS_EMR_JOB_ID", UNKNOWN); - final String applicationId = System.getenv().getOrDefault("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", UNKNOWN); - dimensions.add(new Dimension().withName(DIMENSION_JOB_ID).withValue(jobId)); - dimensions.add(new Dimension().withName(DIMENSION_APPLICATION_ID).withValue(applicationId)); - } - - private static void addInstanceRoleDimension(Set dimensions, String[] parts) { - Dimension instanceRoleDimension; - if (StringUtils.isNumeric(parts[1])) { - instanceRoleDimension = new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue("executor"); - } else { - instanceRoleDimension = new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue(parts[1]); - } - dimensions.add(instanceRoleDimension); - } - // This tries to replicate the logic here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L137 - // Since we don't have access to Spark Configuration here: we are relying on the presence of executorId as part of the metricName. - private boolean doesNameConsistsOfMetricNameSpace(String[] metricNameParts) { - return metricNameParts.length >= 3 - && (metricNameParts[1].equals("driver") || StringUtils.isNumeric(metricNameParts[1])); - } - private void stageMetricDatumWithConvertedSnapshot(final boolean metricConfigured, final String metricName, final Snapshot snapshot, @@ -545,19 +570,19 @@ public String getDesc() { public static class MetricInfo { private String metricName; - private Set dimensions; + private List> dimensionSets; - public MetricInfo(String metricName, Set dimensions) { + public MetricInfo(String metricName, List> dimensionSets) { this.metricName = metricName; - this.dimensions = dimensions; + this.dimensionSets = dimensionSets; } public String getMetricName() { return metricName; } - public Set getDimensions() { - return dimensions; + public List> getDimensionSets() { + return dimensionSets; } } @@ -587,6 +612,7 @@ public static class Builder { private StandardUnit cwRateUnit; private StandardUnit cwDurationUnit; private Set globalDimensions; + private DimensionNameGroups dimensionNameGroups; private final Clock clock; private Builder( @@ -787,6 +813,11 @@ public Builder withShouldAppendDropwizardTypeDimension(final boolean value) { return this; } + public Builder withDimensionNameGroups(final DimensionNameGroups dimensionNameGroups) { + this.dimensionNameGroups = dimensionNameGroups; + return this; + } + /** * Does not actually POST to CloudWatch, logs the {@link PutMetricDataRequest putMetricDataRequest} instead. * {@code false} by default. diff --git a/flint-core/src/test/java/apache/spark/metrics/sink/CloudWatchSinkTests.java b/flint-core/src/test/java/apache/spark/metrics/sink/CloudWatchSinkTest.java similarity index 62% rename from flint-core/src/test/java/apache/spark/metrics/sink/CloudWatchSinkTests.java rename to flint-core/src/test/java/apache/spark/metrics/sink/CloudWatchSinkTest.java index 6f87276a8..db2948858 100644 --- a/flint-core/src/test/java/apache/spark/metrics/sink/CloudWatchSinkTests.java +++ b/flint-core/src/test/java/apache/spark/metrics/sink/CloudWatchSinkTest.java @@ -16,7 +16,10 @@ import java.util.Properties; import org.opensearch.flint.core.metrics.reporter.InvalidMetricsPropertyException; -class CloudWatchSinkTests { +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +class CloudWatchSinkTest { private final MetricRegistry metricRegistry = Mockito.mock(MetricRegistry.class); private final SecurityManager securityManager = Mockito.mock(SecurityManager.class); @@ -71,6 +74,44 @@ void should_throwException_when_pollingTimeUnitPropertyIsInvalid() { Assertions.assertThrows(InvalidMetricsPropertyException.class, executable); } + @Test + void should_throwException_when_DimensionGroupsPropertyIsInvalid() { + final Properties properties = getDefaultValidProperties(); + String jsonString = "{\"dimensionGroups\":[{\"MetricSource1\":{}}, [\"Dimension1\",\"Dimension2\",\"Dimension3\"]]}]}"; + properties.setProperty("dimensionGroups", jsonString); + final Executable executable = () -> { + final CloudWatchSink cloudWatchSink = new CloudWatchSink(properties, metricRegistry, securityManager); + }; + InvalidMetricsPropertyException exception = Assertions.assertThrows(InvalidMetricsPropertyException.class, executable); + StringBuilder expectedMessageBuilder = new StringBuilder(); + expectedMessageBuilder.append("Unable to parse value (") + .append(jsonString) + .append(") for the \"dimensionGroups\" CloudWatchSink metrics property."); + Assertions.assertEquals(expectedMessageBuilder.toString(), exception.getMessage()); + } + + @Test + public void should_CreateCloudWatchSink_When_dimensionGroupsPropertyIsValid() { + final Properties properties = getDefaultValidProperties(); + String jsonString = "{" + + "\"dimensionGroups\": {" + + "\"MetricSource1\": [[\"DimensionA1\", \"DimensionA2\"], [\"DimensionA1\"]]," + + "\"MetricSource2\": [[\"DimensionB1\"], [\"DimensionB2\", \"DimensionB3\", \"DimensionB4\"]]," + + "\"MetricSource3\": [[\"DimensionC1\", \"DimensionC2\", \"DimensionC3\"], [\"DimensionC4\"], [\"DimensionC5\", \"DimensionC6\"]]" + + "}" + + "}"; + properties.setProperty("dimensionGroups", jsonString); + + CloudWatchSink cloudWatchSink = null; + try { + cloudWatchSink = new CloudWatchSink(properties, metricRegistry, securityManager); + } catch (Exception e) { + fail("Should not have thrown any exception, but threw: " + e.getMessage()); + } + + assertNotNull("CloudWatchSink should be created", cloudWatchSink); + } + private Properties getDefaultValidProperties() { final Properties properties = new Properties(); properties.setProperty("namespace", "namespaceValue"); diff --git a/flint-core/src/test/java/org/opensearch/flint/core/metrics/reporter/DimensionUtilsTest.java b/flint-core/src/test/java/org/opensearch/flint/core/metrics/reporter/DimensionUtilsTest.java new file mode 100644 index 000000000..7fab8c346 --- /dev/null +++ b/flint-core/src/test/java/org/opensearch/flint/core/metrics/reporter/DimensionUtilsTest.java @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metrics.reporter; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.amazonaws.services.cloudwatch.model.Dimension; +import org.junit.jupiter.api.function.Executable; + +import java.lang.reflect.Field; +import java.util.Map; + +public class DimensionUtilsTest { + private static final String[] parts = {"someMetric", "123", "dummySource"}; + + @Test + void testConstructDimensionThrowsIllegalArgumentException() { + String dimensionName = "InvalidDimension"; + String[] metricNameParts = {}; + + final Executable executable = () -> { + DimensionUtils.constructDimension(dimensionName, metricNameParts); + }; + IllegalArgumentException exception = Assertions.assertThrows(IllegalArgumentException.class, executable); + Assertions.assertEquals("The provided metric name parts do not consist of a valid metric namespace.", exception.getMessage()); + } + @Test + public void testGetInstanceRoleDimensionWithExecutor() { + Dimension result = DimensionUtils.constructDimension("instanceRole", parts); + assertEquals("instanceRole", result.getName()); + assertEquals("executor", result.getValue()); + } + + @Test + public void testGetInstanceRoleDimensionWithRoleName() { + String[] parts = {"someMetric", "driver", "dummySource"}; + Dimension result = DimensionUtils.constructDimension("instanceRole", parts); + assertEquals("instanceRole", result.getName()); + assertEquals("driver", result.getValue()); + } + + @Test + public void testGetDefaultDimensionWithUnknown() { + Dimension result = DimensionUtils.constructDimension("nonExistentDimension", parts); + assertEquals("nonExistentDimension", result.getName()); + assertEquals("UNKNOWN", result.getValue()); + } + + @Test + public void testGetDimensionsFromSystemEnv() throws NoSuchFieldException, IllegalAccessException { + Class classOfMap = System.getenv().getClass(); + Field field = classOfMap.getDeclaredField("m"); + field.setAccessible(true); + Map writeableEnvironmentVariables = (Map)field.get(System.getenv()); + writeableEnvironmentVariables.put("TEST_VAR", "dummy1"); + writeableEnvironmentVariables.put("SERVERLESS_EMR_JOB_ID", "dummy2"); + Dimension result1 = DimensionUtils.constructDimension("TEST_VAR", parts); + assertEquals("TEST_VAR", result1.getName()); + assertEquals("dummy1", result1.getValue()); + Dimension result2 = DimensionUtils.constructDimension("jobId", parts); + assertEquals("jobId", result2.getName()); + assertEquals("dummy2", result2.getValue()); + } +} diff --git a/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java b/flint-core/src/test/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java similarity index 92% rename from flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java rename to flint-core/src/test/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java index 4774bcc0b..db58993ef 100644 --- a/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java +++ b/flint-core/src/test/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package opensearch.flint.core.metrics.reporter; +package org.opensearch.flint.core.metrics.reporter; import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsyncClient; import com.amazonaws.services.cloudwatch.model.Dimension; @@ -16,8 +16,15 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SlidingWindowReservoir; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; + +import org.apache.spark.metrics.sink.CloudWatchSink; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -36,8 +43,6 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter; -import org.opensearch.flint.core.metrics.reporter.DimensionedName; import static com.amazonaws.services.cloudwatch.model.StandardUnit.Count; import static com.amazonaws.services.cloudwatch.model.StandardUnit.Microseconds; @@ -49,16 +54,12 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_APPLICATION_ID; import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_COUNT; import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_GAUGE; -import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_INSTANCE_ROLE; -import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_JOB_ID; import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_NAME_TYPE; import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_SNAPSHOT_MEAN; import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_SNAPSHOT_STD_DEV; import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_SNAPSHOT_SUMMARY; -import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.UNKNOWN; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) @@ -110,9 +111,8 @@ public void shouldReportWithoutGlobalDimensionsWhenGlobalDimensionsNotConfigured final List dimensions = firstMetricDatumDimensionsFromCapturedRequest(); - assertThat(dimensions).hasSize(3); + assertThat(dimensions).hasSize(1); assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_COUNT)); - assertDefaultDimensionsWithUnknownValue(dimensions); } @@ -124,7 +124,6 @@ public void reportedCounterShouldContainExpectedDimension() throws Exception { final List dimensions = firstMetricDatumDimensionsFromCapturedRequest(); assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_COUNT)); - assertDefaultDimensionsWithUnknownValue(dimensions); } @Test @@ -135,7 +134,6 @@ public void reportedGaugeShouldContainExpectedDimension() throws Exception { final List dimensions = firstMetricDatumDimensionsFromCapturedRequest(); assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_GAUGE)); - assertDefaultDimensionsWithUnknownValue(dimensions); } @Test @@ -483,7 +481,6 @@ public void shouldReportExpectedGlobalAndCustomDimensions() throws Exception { assertThat(dimensions).contains(new Dimension().withName("Region").withValue("us-west-2")); assertThat(dimensions).contains(new Dimension().withName("key1").withValue("value1")); assertThat(dimensions).contains(new Dimension().withName("key2").withValue("value2")); - assertDefaultDimensionsWithUnknownValue(dimensions); } @Test @@ -495,16 +492,14 @@ public void shouldParseDimensionedNamePrefixedWithMetricNameSpaceDriverMetric() .build().encode()).inc(); reporterBuilder.withGlobalDimensions("Region=us-west-2").build().report(); final DimensionedCloudWatchReporter.MetricInfo metricInfo = firstMetricDatumInfoFromCapturedRequest(); - Set dimensions = metricInfo.getDimensions(); + List> dimensionSets = metricInfo.getDimensionSets(); + Set dimensions = dimensionSets.get(0); assertThat(dimensions).contains(new Dimension().withName("Region").withValue("us-west-2")); assertThat(dimensions).contains(new Dimension().withName("key1").withValue("value1")); assertThat(dimensions).contains(new Dimension().withName("key2").withValue("value2")); - assertThat(dimensions).contains(new Dimension().withName(DIMENSION_JOB_ID).withValue(UNKNOWN)); - assertThat(dimensions).contains(new Dimension().withName(DIMENSION_APPLICATION_ID).withValue(UNKNOWN)); - assertThat(dimensions).contains(new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue("driver")); assertThat(metricInfo.getMetricName()).isEqualTo("LiveListenerBus.listenerProcessingTime.org.apache.spark.HeartbeatReceiver"); } - @Test + @Test public void shouldParseDimensionedNamePrefixedWithMetricNameSpaceExecutorMetric() throws Exception { //setting jobId as unknown to invoke name parsing. metricRegistry.counter(DimensionedName.withName("unknown.1.NettyBlockTransfer.shuffle-client.usedDirectMemory") @@ -514,23 +509,44 @@ public void shouldParseDimensionedNamePrefixedWithMetricNameSpaceExecutorMetric( reporterBuilder.withGlobalDimensions("Region=us-west-2").build().report(); final DimensionedCloudWatchReporter.MetricInfo metricInfo = firstMetricDatumInfoFromCapturedRequest(); - Set dimensions = metricInfo.getDimensions(); + Set dimensions = metricInfo.getDimensionSets().get(0); assertThat(dimensions).contains(new Dimension().withName("Region").withValue("us-west-2")); assertThat(dimensions).contains(new Dimension().withName("key1").withValue("value1")); assertThat(dimensions).contains(new Dimension().withName("key2").withValue("value2")); - assertThat(dimensions).contains(new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue( "executor")); - assertThat(dimensions).contains(new Dimension().withName(DIMENSION_JOB_ID).withValue(UNKNOWN)); - assertThat(dimensions).contains(new Dimension().withName(DIMENSION_APPLICATION_ID).withValue(UNKNOWN)); assertThat(metricInfo.getMetricName()).isEqualTo("NettyBlockTransfer.shuffle-client.usedDirectMemory"); } + @Test + public void shouldConsumeMultipleMetricDatumWithDimensionGroups() throws Exception { + // Setup + String metricSourceName = "TestSource"; + Map>> dimensionGroups = new HashMap<>(); + dimensionGroups.put(metricSourceName, Arrays.asList( + Arrays.asList("appName", "instanceRole"), + Arrays.asList("appName") + )); + + metricRegistry.counter(DimensionedName.withName("unknown.1.TestSource.shuffle-client.usedDirectMemory") + .build().encode()).inc(); + + CloudWatchSink.DimensionNameGroups dimensionNameGroups = new CloudWatchSink.DimensionNameGroups(); + dimensionNameGroups.setDimensionGroups(dimensionGroups); + reporterBuilder.withDimensionNameGroups(dimensionNameGroups).build().report(); + final PutMetricDataRequest putMetricDataRequest = metricDataRequestCaptor.getValue(); + final List metricDatums = putMetricDataRequest.getMetricData(); + assertThat(metricDatums).hasSize(2); - private void assertDefaultDimensionsWithUnknownValue(List dimensions) { - assertThat(dimensions).contains(new Dimension().withName(DIMENSION_JOB_ID).withValue(UNKNOWN)); - assertThat(dimensions).contains(new Dimension().withName(DIMENSION_APPLICATION_ID).withValue(UNKNOWN)); - } + MetricDatum metricDatum1 = metricDatums.get(0); + Set dimensions1 = new HashSet(metricDatum1.getDimensions()); + assertThat(dimensions1).contains(new Dimension().withName("appName").withValue("UNKNOWN")); + assertThat(dimensions1).contains(new Dimension().withName("instanceRole").withValue("executor")); + MetricDatum metricDatum2 = metricDatums.get(1); + Set dimensions2 = new HashSet(metricDatum2.getDimensions()); + assertThat(dimensions2).contains(new Dimension().withName("appName").withValue("UNKNOWN")); + assertThat(dimensions2).doesNotContain(new Dimension().withName("instanceRole").withValue("executor")); + } private MetricDatum metricDatumByDimensionFromCapturedRequest(final String dimensionValue) { final PutMetricDataRequest putMetricDataRequest = metricDataRequestCaptor.getValue(); @@ -564,7 +580,10 @@ private List firstMetricDatumDimensionsFromCapturedRequest() { private DimensionedCloudWatchReporter.MetricInfo firstMetricDatumInfoFromCapturedRequest() { final PutMetricDataRequest putMetricDataRequest = metricDataRequestCaptor.getValue(); final MetricDatum metricDatum = putMetricDataRequest.getMetricData().get(0); - return new DimensionedCloudWatchReporter.MetricInfo(metricDatum.getMetricName(), new HashSet<>(metricDatum.getDimensions())); + Set dimensions = new HashSet(metricDatum.getDimensions()); + List> dimensionSet = new ArrayList<>(); + dimensionSet.add(dimensions); + return new DimensionedCloudWatchReporter.MetricInfo(metricDatum.getMetricName(), dimensionSet); } private List allDimensionsFromCapturedRequest() { diff --git a/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedNameTest.java b/flint-core/src/test/java/org/opensearch/flint/core/metrics/reporter/DimensionedNameTest.java similarity index 97% rename from flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedNameTest.java rename to flint-core/src/test/java/org/opensearch/flint/core/metrics/reporter/DimensionedNameTest.java index d6145545d..6bc6a9c2d 100644 --- a/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedNameTest.java +++ b/flint-core/src/test/java/org/opensearch/flint/core/metrics/reporter/DimensionedNameTest.java @@ -1,4 +1,4 @@ -package opensearch.flint.core.metrics.reporter; +package org.opensearch.flint.core.metrics.reporter; import static org.hamcrest.CoreMatchers.hasItems;