Skip to content

Commit

Permalink
Support dimension sets in config (#238)
Browse files Browse the repository at this point in the history
* Dynamically config metric dimensions

Signed-off-by: Louis Chu <[email protected]>

* Address comments from Peng and Chen

Signed-off-by: Louis Chu <[email protected]>

---------

Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger authored Feb 8, 2024
1 parent b1d132e commit 0af83e4
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ScheduledReporter;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
Expand All @@ -26,6 +31,8 @@
import org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter;
import org.opensearch.flint.core.metrics.reporter.DimensionedName;
import org.opensearch.flint.core.metrics.reporter.InvalidMetricsPropertyException;
import com.fasterxml.jackson.databind.ObjectMapper;


/**
* Implementation of the Spark metrics {@link Sink} interface
Expand All @@ -38,6 +45,7 @@
* @author kmccaw
*/
public class CloudWatchSink implements Sink {
private static final ObjectMapper objectMapper = new ObjectMapper();

private final ScheduledReporter reporter;

Expand Down Expand Up @@ -198,12 +206,26 @@ public CloudWatchSink(
metricFilter = MetricFilter.ALL;
}

final Optional<String> dimensionGroupsProperty = getProperty(properties, PropertyKeys.DIMENSION_GROUPS);
DimensionNameGroups dimensionNameGroups = null;
if (dimensionGroupsProperty.isPresent()) {
try {
dimensionNameGroups = objectMapper.readValue(dimensionGroupsProperty.get(), DimensionNameGroups.class);
} catch (IOException e) {
final String message = String.format(
"Unable to parse value (%s) for the \"%s\" CloudWatchSink metrics property.",
dimensionGroupsProperty.get(),
PropertyKeys.DIMENSION_GROUPS);
throw new InvalidMetricsPropertyException(message, e);
}
}

final AmazonCloudWatchAsync cloudWatchClient = AmazonCloudWatchAsyncClient.asyncBuilder()
.withCredentials(awsCredentialsProvider)
.withRegion(awsRegion)
.build();

this.reporter = DimensionedCloudWatchReporter.forRegistry(metricRegistry, cloudWatchClient, namespaceProperty.get())
DimensionedCloudWatchReporter.Builder builder = DimensionedCloudWatchReporter.forRegistry(metricRegistry, cloudWatchClient, namespaceProperty.get())
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.filter(metricFilter)
Expand All @@ -220,8 +242,13 @@ public CloudWatchSink(
.withStatisticSet()
.withGlobalDimensions()
.withShouldParseDimensionsFromName(shouldParseInlineDimensions)
.withShouldAppendDropwizardTypeDimension(shouldAppendDropwizardTypeDimension)
.build();
.withShouldAppendDropwizardTypeDimension(shouldAppendDropwizardTypeDimension);

if (dimensionNameGroups != null && dimensionNameGroups.getDimensionGroups() != null) {
builder = builder.withDimensionNameGroups(dimensionNameGroups);
}

this.reporter = builder.withDimensionNameGroups(dimensionNameGroups).build();
}

@Override
Expand Down Expand Up @@ -262,6 +289,7 @@ private static class PropertyKeys {
static final String SHOULD_PARSE_INLINE_DIMENSIONS = "shouldParseInlineDimensions";
static final String SHOULD_APPEND_DROPWIZARD_TYPE_DIMENSION = "shouldAppendDropwizardTypeDimension";
static final String METRIC_FILTER_REGEX = "regex";
static final String DIMENSION_GROUPS = "dimensionGroups";
}

/**
Expand All @@ -272,4 +300,45 @@ private static class PropertyDefaults {
static final TimeUnit POLLING_PERIOD_TIME_UNIT = TimeUnit.MINUTES;
static final boolean SHOULD_PARSE_INLINE_DIMENSIONS = false;
}

/**
* Represents a container for grouping dimension names used in metrics reporting.
* This class allows for the organization and storage of dimension names into logical groups,
* facilitating the dynamic construction and retrieval of dimension information for metrics.
*/
public static class DimensionNameGroups {
// Holds the grouping of dimension names categorized under different keys.
private Map<String, List<List<String>>> dimensionGroups = new HashMap<>();

/**
* Sets the mapping of dimension groups. Each key in the map represents a category or a type
* of dimension, and the value is a list of dimension name groups, where each group is itself
* a list of dimension names that are logically grouped together.
*
* @param dimensionGroups A map of dimension groups categorized by keys, where each key maps
* to a list of dimension name groups.
*/
public void setDimensionGroups(Map<String, List<List<String>>> dimensionGroups) {
if (dimensionGroups == null) {
final String message = String.format(
"Undefined value for the \"%s\" CloudWatchSink metrics property.",
PropertyKeys.DIMENSION_GROUPS);
throw new InvalidMetricsPropertyException(message);
}
this.dimensionGroups = dimensionGroups;
}

/**
* Retrieves the current mapping of dimension groups. The structure of the returned map is such
* that each key represents a specific category or type of dimension, and the corresponding value
* is a list of dimension name groups. Each group is a list of dimension names that are logically
* grouped together.
*
* @return A map representing the groups of dimension names categorized by keys. Each key maps
* to a list of lists, where each inner list is a group of related dimension names.
*/
public Map<String, List<List<String>>> getDimensionGroups() {
return dimensionGroups;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metrics.reporter;

import java.util.Map;
import java.util.function.Function;
import org.apache.commons.lang.StringUtils;

import com.amazonaws.services.cloudwatch.model.Dimension;

/**
* Utility class for creating and managing CloudWatch dimensions for metrics reporting in Flint.
* It facilitates the construction of dimensions based on different system properties and environment
* variables, supporting the dynamic tagging of metrics with relevant information like job ID,
* application ID, and more.
*/
public class DimensionUtils {
private static final String DIMENSION_JOB_ID = "jobId";
private static final String DIMENSION_APPLICATION_ID = "applicationId";
private static final String DIMENSION_APPLICATION_NAME = "applicationName";
private static final String DIMENSION_DOMAIN_ID = "domainId";
private static final String DIMENSION_INSTANCE_ROLE = "instanceRole";
private static final String UNKNOWN = "UNKNOWN";

// Maps dimension names to functions that generate Dimension objects based on specific logic or environment variables
private static final Map<String, Function<String[], Dimension>> dimensionBuilders = Map.of(
DIMENSION_INSTANCE_ROLE, DimensionUtils::getInstanceRoleDimension,
DIMENSION_JOB_ID, ignored -> getEnvironmentVariableDimension("SERVERLESS_EMR_JOB_ID", DIMENSION_JOB_ID),
DIMENSION_APPLICATION_ID, ignored -> getEnvironmentVariableDimension("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", DIMENSION_APPLICATION_ID),
DIMENSION_APPLICATION_NAME, ignored -> getEnvironmentVariableDimension("SERVERLESS_EMR_APPLICATION_NAME", DIMENSION_APPLICATION_NAME),
DIMENSION_DOMAIN_ID, ignored -> getEnvironmentVariableDimension("FLINT_CLUSTER_NAME", DIMENSION_DOMAIN_ID)
);

/**
* Constructs a CloudWatch Dimension object based on the provided dimension name. If a specific
* builder exists for the dimension name, it is used; otherwise, a default dimension is constructed.
*
* @param dimensionName The name of the dimension to construct.
* @param parts Additional information that might be required by specific dimension builders.
* @return A CloudWatch Dimension object.
*/
public static Dimension constructDimension(String dimensionName, String[] metricNameParts) {
if (!doesNameConsistsOfMetricNameSpace(metricNameParts)) {
throw new IllegalArgumentException("The provided metric name parts do not consist of a valid metric namespace.");
}
return dimensionBuilders.getOrDefault(dimensionName, ignored -> getDefaultDimension(dimensionName))
.apply(metricNameParts);
}

// This tries to replicate the logic here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L137
// Since we don't have access to Spark Configuration here: we are relying on the presence of executorId as part of the metricName.
public static boolean doesNameConsistsOfMetricNameSpace(String[] metricNameParts) {
return metricNameParts.length >= 3
&& (metricNameParts[1].equals("driver") || StringUtils.isNumeric(metricNameParts[1]));
}

/**
* Generates a Dimension object representing the instance role (either executor or driver) based on the
* metric name parts provided.
*
* @param parts An array where the second element indicates the role by being numeric (executor) or not (driver).
* @return A Dimension object with the instance role.
*/
private static Dimension getInstanceRoleDimension(String[] parts) {
String value = StringUtils.isNumeric(parts[1]) ? "executor" : parts[1];
return new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue(value);
}

/**
* Constructs a Dimension object using a system environment variable. If the environment variable is not found,
* it uses a predefined "UNKNOWN" value.
*
* @param envVarName The name of the environment variable to use for the dimension's value.
* @param dimensionName The name of the dimension.
* @return A Dimension object populated with the appropriate name and value.
*/
private static Dimension getEnvironmentVariableDimension(String envVarName, String dimensionName) {
String value = System.getenv().getOrDefault(envVarName, UNKNOWN);
return new Dimension().withName(dimensionName).withValue(value);
}

/**
* Provides a generic mechanism to construct a Dimension object with an environment variable value
* or a default value if the environment variable is not set.
*
* @param dimensionName The name of the dimension for which to retrieve the value.
* @return A Dimension object populated with the dimension name and its corresponding value.
*/
private static Dimension getDefaultDimension(String dimensionName) {
return getEnvironmentVariableDimension(dimensionName, dimensionName);
}
}
Loading

0 comments on commit 0af83e4

Please sign in to comment.