From 8ac4d2188bef8f3bce7bf816fae206924fed2adc Mon Sep 17 00:00:00 2001 From: psingh3 Date: Tue, 20 May 2025 09:21:07 +0530 Subject: [PATCH 01/11] added micrometer dependency Signed-off-by: psingh3 --- java-client/build.gradle.kts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/java-client/build.gradle.kts b/java-client/build.gradle.kts index 35c3f3f98..98a20d68e 100644 --- a/java-client/build.gradle.kts +++ b/java-client/build.gradle.kts @@ -246,6 +246,13 @@ dependencies { testImplementation("junit", "junit" , "4.13.2") { exclude(group = "org.hamcrest") } + + + // Micrometer + implementation("io.micrometer:micrometer-core:1.12.3") + + // Awaitility + testImplementation("org.awaitility:awaitility:4.2.0") } licenseReport { From f4d4e435fbdb2037b9af4d2b81da3903a073a47e Mon Sep 17 00:00:00 2001 From: psingh3 Date: Tue, 20 May 2025 10:13:03 +0530 Subject: [PATCH 02/11] added client side metrics Signed-off-by: psingh3 --- java-client/build.gradle.kts | 1 - .../ExecutionMetricContext.java | 46 ++ .../client_metrics/MeterOptions.java | 47 ++ .../client_metrics/MetricConstants.java | 13 + .../transport/client_metrics/MetricGroup.java | 12 + .../transport/client_metrics/MetricName.java | 22 + .../client_metrics/MetricOptions.java | 141 ++++++ .../transport/client_metrics/MetricTag.java | 20 + .../NetworkRequestMetricContext.java | 38 ++ .../client_metrics/RequestMetricContext.java | 26 + .../TelemetryMetricsManager.java | 250 ++++++++++ .../ApacheHttpClient5Transport.java | 151 +++++- .../ApacheHttpClient5TransportBuilder.java | 10 +- .../httpclient5/OpenSearchRequestFuture.java | 12 + .../integTest/AbstractClientMetricsIT.java | 466 ++++++++++++++++++ .../OpenSearchJavaClientTestCase.java | 47 ++ .../integTest/OpenSearchTransportSupport.java | 5 + .../ApacheHttpClient5TransportTest.java | 55 +++ .../HttpClient5TransportSupport.java | 29 ++ .../client_metrics/MeterOptionsTest.java | 38 ++ .../client_metrics/MetricOptionsTest.java | 81 +++ .../TelemetryMetricsManagerTest.java | 288 +++++++++++ 22 files changed, 1793 insertions(+), 5 deletions(-) create mode 100644 java-client/src/main/java/org/opensearch/client/transport/client_metrics/ExecutionMetricContext.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/client_metrics/MeterOptions.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricConstants.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricGroup.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricName.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricOptions.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricTag.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/client_metrics/NetworkRequestMetricContext.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/client_metrics/RequestMetricContext.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/client_metrics/TelemetryMetricsManager.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/OpenSearchRequestFuture.java create mode 100644 java-client/src/test/java11/org/opensearch/client/opensearch/integTest/AbstractClientMetricsIT.java create mode 100644 java-client/src/test/java11/org/opensearch/client/opensearch/integTest/httpclient5/ApacheHttpClient5TransportTest.java create mode 100644 java-client/src/test/java11/org/opensearch/client/transport/client_metrics/MeterOptionsTest.java create mode 100644 java-client/src/test/java11/org/opensearch/client/transport/client_metrics/MetricOptionsTest.java create mode 100644 java-client/src/test/java11/org/opensearch/client/transport/client_metrics/TelemetryMetricsManagerTest.java diff --git a/java-client/build.gradle.kts b/java-client/build.gradle.kts index 98a20d68e..adc4a67f4 100644 --- a/java-client/build.gradle.kts +++ b/java-client/build.gradle.kts @@ -247,7 +247,6 @@ dependencies { exclude(group = "org.hamcrest") } - // Micrometer implementation("io.micrometer:micrometer-core:1.12.3") diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/ExecutionMetricContext.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/ExecutionMetricContext.java new file mode 100644 index 000000000..41b1805a8 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/ExecutionMetricContext.java @@ -0,0 +1,46 @@ +package org.opensearch.client.transport.client_metrics; + +import java.time.Duration; + +/** + * Contains necessary information about a request execution to be used for metric recordings. + */ +public abstract class ExecutionMetricContext { + + public static final int DEFAULT_EMPTY_STATUS_CODE = -1; + private Throwable throwable = null; + private int statusCode = DEFAULT_EMPTY_STATUS_CODE; + private Duration executionTime = null; + + protected ExecutionMetricContext() {} + + protected ExecutionMetricContext(Throwable throwable, int statusCode, Duration executionTime) { + this.throwable = throwable; + this.statusCode = statusCode; + this.executionTime = executionTime; + } + + public Throwable getThrowable() { + return throwable; + } + + public int getStatusCode() { + return statusCode; + } + + public Duration getRequestExecutionTime() { + return executionTime; + } + + public void setThrowable(Throwable throwable) { + this.throwable = throwable; + } + + public void setStatusCode(int statusCode) { + this.statusCode = statusCode; + } + + public void setRequestExecutionTime(Duration executionTime) { + this.executionTime = executionTime; + } +} \ No newline at end of file diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MeterOptions.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MeterOptions.java new file mode 100644 index 000000000..da9f86d5e --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MeterOptions.java @@ -0,0 +1,47 @@ +package org.opensearch.client.transport.client_metrics; + +import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_EXCLUDED_TAGS; +import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_PERCENTILES; + +import io.micrometer.core.instrument.Tags; +import java.util.Set; + +/** + * Contains settings for configuring a meter + */ +public class MeterOptions { + private final double[] percentiles; + private final Tags commonTags; + private final Set excludedTagNames; + + public MeterOptions(double[] percentiles, Tags commonTags, Set excludedTagNames) { + this.percentiles = percentiles == null ? DEFAULT_PERCENTILES.clone() : percentiles.clone(); + this.commonTags = commonTags == null ? Tags.empty() : commonTags; + this.excludedTagNames = excludedTagNames == null ? DEFAULT_EXCLUDED_TAGS : excludedTagNames; + } + + /** + * Get percentiles to publish for Timer/Distribution meters + * @return a double array + */ + public double[] getPercentiles() { + return percentiles; + } + + /** + * Get common {@link io.micrometer.core.instrument.Tags} that this meter need to have + * @return a {@link io.micrometer.core.instrument.Tags} + */ + public Tags getCommonTags() { + return commonTags; + } + + /** + * Get tag names that a meter are excluded. + * @return a set of {@link MetricTag} + */ + public Set getExcludedTagNames() { + return excludedTagNames; + } + +} \ No newline at end of file diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricConstants.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricConstants.java new file mode 100644 index 000000000..3020893c4 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricConstants.java @@ -0,0 +1,13 @@ +package org.opensearch.client.transport.client_metrics; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Metrics; +import java.util.EnumSet; +import java.util.Set; + +public class MetricConstants { + public static final double[] DEFAULT_PERCENTILES = new double[] { 0.99, 0.95, 0.9, 0.75, 0.5 }; + public static final MeterRegistry DEFAULT_REGISTRY = Metrics.globalRegistry; + public static final Set DEFAULT_EXCLUDED_TAGS = EnumSet.noneOf(MetricTag.class); + public static final Set DEFAULT_ADDITIONAL_METRIC_GROUPS = EnumSet.noneOf(MetricGroup.class); +} \ No newline at end of file diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricGroup.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricGroup.java new file mode 100644 index 000000000..108142282 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricGroup.java @@ -0,0 +1,12 @@ +package org.opensearch.client.transport.client_metrics; + +import java.util.EnumSet; +import java.util.Set; + +public enum MetricGroup { + GENERAL, + NETWORK_DETAILS; + + public static final Set REQUIRED_GROUPS = EnumSet.of(GENERAL); + public static final Set ALL = EnumSet.allOf(MetricGroup.class); +} \ No newline at end of file diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricName.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricName.java new file mode 100644 index 000000000..f6766e0f2 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricName.java @@ -0,0 +1,22 @@ +package org.opensearch.client.transport.client_metrics; + +public enum MetricName { + REQUEST("request"), + NETWORK_REQUEST("network.request"), + ACTIVE_NODES("active.nodes"), + INACTIVE_NODES("inactive.nodes"), + REQUEST_PAYLOAD_SIZE("request.payload.size"), + RESPONSE_PAYLOAD_SIZE("response.payload.size"); + + private static final String PREFIX = "os.client"; + private final String metricName; + + MetricName(String metricName) { + this.metricName = metricName; + } + + @Override + public String toString() { + return PREFIX + "." + metricName; + } +} \ No newline at end of file diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricOptions.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricOptions.java new file mode 100644 index 000000000..741fd77c5 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricOptions.java @@ -0,0 +1,141 @@ +package org.opensearch.client.transport.client_metrics; + +import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_ADDITIONAL_METRIC_GROUPS; +import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_EXCLUDED_TAGS; +import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_PERCENTILES; +import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_REGISTRY; + +import io.micrometer.core.instrument.MeterRegistry; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.Set; + +/** + * Contains settings to configure metrics + */ +public class MetricOptions { + private final boolean isEnabled; + private final MeterRegistry meterRegistry; + private final String clientId; + private final double[] percentiles; + private final Set excludedTags; + private final Set metricGroups; + + public MetricOptions(MetricOptionsBuilder builder) { + meterRegistry = builder.meterRegistry == null ? DEFAULT_REGISTRY : builder.meterRegistry; + clientId = builder.clientId == null || builder.clientId.isEmpty() + ? String.valueOf(TelemetryMetricsManager.generateClientID()) + : builder.clientId; + percentiles = builder.percentiles == null ? DEFAULT_PERCENTILES : builder.percentiles; + isEnabled = builder.isEnabled; + excludedTags = builder.excludedTags == null ? DEFAULT_EXCLUDED_TAGS : builder.excludedTags; + metricGroups = builder.metricGroups == null ? DEFAULT_ADDITIONAL_METRIC_GROUPS : builder.metricGroups; + } + + public static MetricOptionsBuilder builder() { + return new MetricOptionsBuilder(); + } + + public boolean isMetricsEnabled() { + return isEnabled; + } + + public MeterRegistry getMeterRegistry() { + return meterRegistry; + } + + public String getClientId() { + return clientId; + } + + public double[] getPercentiles() { + return percentiles; + } + + public Set getExcludedTags() { + return excludedTags; + } + + public Set getMetricGroups() { + return metricGroups; + } + + public static class MetricOptionsBuilder { + private boolean isEnabled; + private MeterRegistry meterRegistry = null; + private String clientId = null; + private double[] percentiles = null; + private Set excludedTags = null; + private Set metricGroups = null; + + /** + * Set whether the metrics system is enabled + * @param enabled true to enable metrics; otherwise, false + * @return current {@link MetricOptionsBuilder} + */ + public MetricOptionsBuilder setMetricsEnabled(boolean enabled) { + this.isEnabled = enabled; + return this; + } + + /** + * Set a {@link MeterRegistry} for the client to propagate metrics + * @param meterRegistry a {@link MeterRegistry} instance + * @return current {@link MetricOptionsBuilder} + */ + public MetricOptionsBuilder setMeterRegistry(MeterRegistry meterRegistry) { + this.meterRegistry = meterRegistry; + return this; + } + + /** + * Give the client a name/ID + * @param clientId a name + * @return current {@link MetricOptionsBuilder} + */ + public MetricOptionsBuilder setClientId(String clientId) { + this.clientId = clientId; + return this; + } + + /** + * Set percentiles of distribution metrics to be published. For instance, pass {@code 0.95}, + * {@code 0.99} to publish request latency P99 and P95. + * + * @param percentiles percentile values to publish + * @return current {@link MetricOptionsBuilder} + */ + public MetricOptionsBuilder setPercentiles(double... percentiles) { + this.percentiles = percentiles; + return this; + } + + /** + * Set tags that metrics cannot have. Each metric has a set of required tags + * that cannot be excluded. However, for optional tags, they can be dropped based on + * the tags set here. This option allows users to exclude high cardinality tags. + * @param tags {@link MetricTag} names to exclude + * @return current {@link MetricOptionsBuilder} + */ + public MetricOptionsBuilder setExcludedTags(MetricTag... tags) { + this.excludedTags = EnumSet.copyOf(Arrays.asList(tags)); + return this; + } + + /** + * Set {@link MetricGroup} that you would like to be enabled in addition to groups under + * {@link MetricGroup#REQUIRED_GROUPS}. + * + * @param metricGroups {@link MetricGroup} names to be enabled + * @return current {@link MetricOptionsBuilder} + */ + public MetricOptionsBuilder setAdditionalMetricGroups(MetricGroup... metricGroups) { + this.metricGroups = EnumSet.copyOf(Arrays.asList(metricGroups)); + return this; + } + + public MetricOptions build() { + return new MetricOptions(this); + } + } +} \ No newline at end of file diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricTag.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricTag.java new file mode 100644 index 000000000..627984752 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricTag.java @@ -0,0 +1,20 @@ +package org.opensearch.client.transport.client_metrics; + +public enum MetricTag { + REQUEST("Request"), + CLIENT_ID("ClientID"), + STATUS_CODE_OR_EXCEPTION("StatusCodeOrException"), + HOST_CONTACTED("HostContacted"), + HOST("Host"); + + private String name; + + MetricTag(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } +} \ No newline at end of file diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/NetworkRequestMetricContext.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/NetworkRequestMetricContext.java new file mode 100644 index 000000000..1aeee6822 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/NetworkRequestMetricContext.java @@ -0,0 +1,38 @@ +package org.opensearch.client.transport.client_metrics; + +import java.time.Duration; + +public class NetworkRequestMetricContext extends ExecutionMetricContext { + private String hostName; + private long responsePayloadSize = -1; + private long requestPayloadSize = -1; + + public NetworkRequestMetricContext(String hostName, Throwable throwable, int statusCode, Duration executionTime) { + super(throwable, statusCode, executionTime); + this.hostName = hostName; + } + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public long getResponsePayloadSize() { + return responsePayloadSize; + } + + public void setResponsePayloadSize(long responsePayloadSize) { + this.responsePayloadSize = responsePayloadSize; + } + + public long getRequestPayloadSize() { + return requestPayloadSize; + } + + public void setRequestPayloadSize(long requestPayloadSize) { + this.requestPayloadSize = requestPayloadSize; + } +} \ No newline at end of file diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/RequestMetricContext.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/RequestMetricContext.java new file mode 100644 index 000000000..bf8f3c0e0 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/RequestMetricContext.java @@ -0,0 +1,26 @@ +package org.opensearch.client.transport.client_metrics; + +import java.util.ArrayList; +import java.util.List; + +public class RequestMetricContext extends ExecutionMetricContext { + private final List networkRequestContexts = new ArrayList<>(); + + public List getNetworkRequestContexts() { + return networkRequestContexts; + } + + public String getContactedHosts() { + StringBuilder builder = new StringBuilder(); + if (!networkRequestContexts.isEmpty()) { + networkRequestContexts.forEach(networkRequest -> builder.append(networkRequest.getHostName()).append(",")); + return builder.substring(0, builder.length() - 1); + } + return "NONE"; + } + + public void addNetworkRequestContext(NetworkRequestMetricContext nodeContext) { + networkRequestContexts.add(nodeContext); + } + +} \ No newline at end of file diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/TelemetryMetricsManager.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/TelemetryMetricsManager.java new file mode 100644 index 000000000..89180d6e8 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/TelemetryMetricsManager.java @@ -0,0 +1,250 @@ +package org.opensearch.client.transport.client_metrics; + +import static org.opensearch.client.transport.client_metrics.ExecutionMetricContext.DEFAULT_EMPTY_STATUS_CODE; +import static org.opensearch.client.transport.client_metrics.MetricName.ACTIVE_NODES; +import static org.opensearch.client.transport.client_metrics.MetricName.INACTIVE_NODES; +import static org.opensearch.client.transport.client_metrics.MetricName.NETWORK_REQUEST; +import static org.opensearch.client.transport.client_metrics.MetricName.REQUEST; +import static org.opensearch.client.transport.client_metrics.MetricName.REQUEST_PAYLOAD_SIZE; +import static org.opensearch.client.transport.client_metrics.MetricName.RESPONSE_PAYLOAD_SIZE; + +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.composite.CompositeMeterRegistry; +import java.time.Duration; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.opensearch.client.transport.client_metrics.ExecutionMetricContext; +import org.opensearch.client.transport.client_metrics.MeterOptions; +import org.opensearch.client.transport.client_metrics.MetricGroup; +import org.opensearch.client.transport.client_metrics.MetricTag; +import org.opensearch.client.transport.client_metrics.NetworkRequestMetricContext; +import org.opensearch.client.transport.client_metrics.RequestMetricContext; + +public class TelemetryMetricsManager { + private static final Log logger = LogFactory.getLog(TelemetryMetricsManager.class); + private static final String EXCEPTION_PREFIX = "ClientError-"; + private static final AtomicInteger CLIENT_ID_GENERATOR = new AtomicInteger(0); + private static final CompositeMeterRegistry mainRegistry = new CompositeMeterRegistry(); + + private TelemetryMetricsManager() { + throw new IllegalStateException("Cannot instantiate a utility class"); + } + + public static int generateClientID() { + return CLIENT_ID_GENERATOR.getAndIncrement(); + } + + /** + * Add the provided registry to the main composite registry + * @param registry a {@link MeterRegistry} + */ + public static synchronized void addRegistry(MeterRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("Cannot add a null registry"); + } + mainRegistry.add(registry); + } + + /** + * Remove the provided registry from the main composite registry. If the main registry has no + * child registries, it is cleared. + * @param registry a {@link MeterRegistry} + */ + public static synchronized void removeRegistry(MeterRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("Cannot remove a null registry"); + } + mainRegistry.remove(registry); + if (mainRegistry.getRegistries().isEmpty()) { + mainRegistry.clear(); + } + } + + /** + * Record information for request-related metrics. + * + * @param requestName name of the executed request + * @param meterOptions options for configuring its meter + * @param context a {@link RequestMetricContext} object about the execution + * @param metricGroups {@link MetricGroup} groups to record + */ + public static void recordRequestMetrics( + String requestName, + MeterOptions meterOptions, + RequestMetricContext context, + Set metricGroups + ) { + if (requestName == null) { + throw new IllegalArgumentException("Request name cannot be null"); + } + if (context == null) { + throw new IllegalArgumentException("Metric context cannot be null"); + } + if (meterOptions == null) { + throw new IllegalArgumentException("Meter options cannot be null"); + } + if (metricGroups == null) { + metricGroups = MetricGroup.REQUIRED_GROUPS; + } + + Set excludedTags = Optional.ofNullable(meterOptions.getExcludedTagNames()).orElse(EnumSet.noneOf(MetricTag.class)); + recordOverallRequestMetric(meterOptions, context, getRequestTags(requestName, context), excludedTags); + if (metricGroups.contains(MetricGroup.NETWORK_DETAILS)) { + Tags payloadSizeTags = Tags.of(MetricTag.REQUEST.toString(), requestName).and(meterOptions.getCommonTags()); + for (NetworkRequestMetricContext networkRequestMetricContext : context.getNetworkRequestContexts()) { + recordNetworkRequestMetric( + meterOptions, + networkRequestMetricContext, + getRequestTags(requestName, networkRequestMetricContext), + excludedTags + ); + recordPayloadSizes(networkRequestMetricContext, payloadSizeTags); + } + } + } + + public static void initializeNodeGauges( + MeterOptions meterOptions, + Supplier activeNodeUpdater, + Supplier inactiveNodeUpdater + ) { + if (meterOptions == null) { + throw new IllegalArgumentException("Meter options cannot be null"); + } + if (activeNodeUpdater == null) { + throw new IllegalArgumentException("activeNodeUpdater cannot be null"); + } + if (inactiveNodeUpdater == null) { + throw new IllegalArgumentException("inactiveNodeUpdater cannot be null"); + } + Tags tags = Tags.empty().and(meterOptions.getCommonTags()); + Gauge.builder(ACTIVE_NODES.toString(), activeNodeUpdater) + .description("Number of active nodes to serve traffic") + .tags(tags) + .baseUnit("nodes") + .register(mainRegistry); + Gauge.builder(INACTIVE_NODES.toString(), inactiveNodeUpdater) + .description("Number of inactive nodes that cannot serve traffic") + .tags(tags) + .baseUnit("nodes") + .register(mainRegistry); + } + + protected static void recordPayloadSizes(NetworkRequestMetricContext context, Tags tags) { + if (tags == null) { + tags = Tags.empty(); + } + if (context == null) { + throw new IllegalArgumentException("Metric context cannot be null"); + } + + long requestPayloadSize = context.getRequestPayloadSize(); + long responsePayloadSize = context.getResponsePayloadSize(); + if (requestPayloadSize > -1) { + DistributionSummary.builder(REQUEST_PAYLOAD_SIZE.toString()) + .tags(tags) + .description("Request payload size") + .baseUnit("bytes") + .register(mainRegistry) + .record(requestPayloadSize); + } + if (responsePayloadSize > -1) { + DistributionSummary.builder(RESPONSE_PAYLOAD_SIZE.toString()) + .tags(tags) + .description("Response payload size") + .baseUnit("bytes") + .register(mainRegistry) + .record(responsePayloadSize); + } + } + + private static void recordNetworkRequestMetric( + MeterOptions meterOptions, + NetworkRequestMetricContext context, + Tags requestTags, + Set excludedTags + ) { + if (context.getRequestExecutionTime() != null) { + Tags networkRequestTags = excludedTags.contains(MetricTag.HOST) + ? requestTags + : requestTags.and(Tag.of(MetricTag.HOST.toString(), context.getHostName())); + Timer.builder(NETWORK_REQUEST.toString()) + .description("Apache HttpClient request latency") + .publishPercentiles(meterOptions.getPercentiles()) + .tags(networkRequestTags.and(meterOptions.getCommonTags())) + .maximumExpectedValue(Duration.ofSeconds(30)) + .register(mainRegistry) + .record(context.getRequestExecutionTime()); + } else if (logger.isDebugEnabled()) { + logger.debug("Missing execution duration. Skipping " + NETWORK_REQUEST); + } + } + + private static void recordOverallRequestMetric( + MeterOptions meterOptions, + RequestMetricContext context, + Tags requiredRequestTags, + Set excludedTags + ) { + if (context.getRequestExecutionTime() != null) { + Tags requestMeterTags = excludedTags.contains(MetricTag.HOST_CONTACTED) + ? requiredRequestTags + : requiredRequestTags.and(Tag.of(MetricTag.HOST_CONTACTED.toString(), context.getContactedHosts())); + Timer.builder(REQUEST.toString()) + .description("End-to-end request execution latency") + .publishPercentiles(meterOptions.getPercentiles()) + .tags(requestMeterTags.and(meterOptions.getCommonTags())) + .maximumExpectedValue(Duration.ofSeconds(30)) + .register(mainRegistry) + .record(context.getRequestExecutionTime()); + } else if (logger.isDebugEnabled()) { + logger.debug("Missing execution duration. Skipping " + REQUEST); + } + } + + /** + * Compose required {@link Tags} for metrics {@link MetricName#REQUEST} and {@link MetricName#NETWORK_REQUEST} + * based on the provided information. + * + *

+ * Required tags: {@link MetricTag#REQUEST}, {@link MetricTag#STATUS_CODE_OR_EXCEPTION} + *

+ * + * @param requestName name of the executed request + * @param context a {@link RequestMetricContext} object about the execution + * @return {@link Tags} + */ + private static Tags getRequestTags(String requestName, ExecutionMetricContext context) { + List tagList = new ArrayList<>(); + tagList.add(Tag.of(MetricTag.REQUEST.toString(), requestName == null ? "" : requestName)); + tagList.add(Tag.of(MetricTag.STATUS_CODE_OR_EXCEPTION.toString(), extractStatusCodeOrException(context))); + return Tags.of(tagList); + } + + private static String extractStatusCodeOrException(ExecutionMetricContext context) { + if (context != null) { + if (context.getStatusCode() != DEFAULT_EMPTY_STATUS_CODE) { + return String.valueOf(context.getStatusCode()); + } else if (context.getThrowable() != null) { + Throwable error = context.getThrowable(); + if (error.getCause() != null) { + return EXCEPTION_PREFIX + error.getCause().getClass().getSimpleName(); + } + return EXCEPTION_PREFIX + error.getClass().getSimpleName(); + } + } + return "UNKNOWN"; + } +} \ No newline at end of file diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java index 2966473db..2adfc51fe 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java @@ -8,6 +8,8 @@ package org.opensearch.client.transport.httpclient5; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tags; import jakarta.json.stream.JsonGenerator; import jakarta.json.stream.JsonParser; import java.io.ByteArrayInputStream; @@ -20,11 +22,13 @@ import java.net.SocketTimeoutException; import java.net.URI; import java.net.URISyntaxException; +import java.time.Duration; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; @@ -42,6 +46,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.zip.GZIPOutputStream; import javax.annotation.Nullable; @@ -93,6 +98,12 @@ import org.opensearch.client.transport.OpenSearchTransport; import org.opensearch.client.transport.TransportException; import org.opensearch.client.transport.TransportOptions; +import org.opensearch.client.transport.client_metrics.MeterOptions; +import org.opensearch.client.transport.client_metrics.MetricGroup; +import org.opensearch.client.transport.client_metrics.MetricOptions; +import org.opensearch.client.transport.client_metrics.NetworkRequestMetricContext; +import org.opensearch.client.transport.client_metrics.RequestMetricContext; +import org.opensearch.client.transport.client_metrics.TelemetryMetricsManager; import org.opensearch.client.transport.endpoints.BooleanEndpoint; import org.opensearch.client.transport.endpoints.BooleanResponse; import org.opensearch.client.transport.httpclient5.internal.HttpUriRequestProducer; @@ -100,6 +111,10 @@ import org.opensearch.client.transport.httpclient5.internal.NodeSelector; import org.opensearch.client.util.MissingRequiredPropertyException; +import static org.opensearch.client.transport.client_metrics.ExecutionMetricContext.DEFAULT_EMPTY_STATUS_CODE; +import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_REGISTRY; +import static org.opensearch.client.transport.client_metrics.MetricTag.CLIENT_ID; + /** * Apache HttpClient 5 based client transport. */ @@ -121,6 +136,12 @@ public class ApacheHttpClient5Transport implements OpenSearchTransport { private final String pathPrefix; private final List
defaultHeaders; + private final boolean isMetricsEnabled; + private MeterOptions meterOptions; + private MeterRegistry meterRegistry; + private Set metricGroups = EnumSet.copyOf(MetricGroup.REQUIRED_GROUPS); + private String clientID = "NONE"; + public ApacheHttpClient5Transport( final CloseableHttpAsyncClient client, final Header[] defaultHeaders, @@ -132,7 +153,8 @@ public ApacheHttpClient5Transport( final NodeSelector nodeSelector, final boolean strictDeprecationMode, final boolean compressionEnabled, - final boolean chunkedEnabled + final boolean chunkedEnabled, + final MetricOptions metricOptions ) { this.mapper = mapper; this.client = client; @@ -145,6 +167,22 @@ public ApacheHttpClient5Transport( this.chunkedEnabled = chunkedEnabled; this.compressionEnabled = compressionEnabled; setNodes(nodes); + + if (metricOptions != null && metricOptions.isMetricsEnabled()) { + if (metricOptions.getClientId() != null && !metricOptions.getClientId().isEmpty()) { + this.clientID = metricOptions.getClientId(); + } + if (metricOptions.getMetricGroups() != null) { + this.metricGroups.addAll(metricOptions.getMetricGroups()); + } + this.isMetricsEnabled = metricOptions.isMetricsEnabled(); + this.meterRegistry = metricOptions.getMeterRegistry() == null ? DEFAULT_REGISTRY : metricOptions.getMeterRegistry(); + this.meterOptions = new MeterOptions(metricOptions.getPercentiles(), getCommonTags(), metricOptions.getExcludedTags()); + TelemetryMetricsManager.addRegistry(this.meterRegistry); + TelemetryMetricsManager.initializeNodeGauges(this.meterOptions, getActiveNodeGaugeUpdater(), getInactiveNodeGaugeUpdater()); + } else { + this.isMetricsEnabled = false; + } } @Override @@ -175,12 +213,14 @@ public CompletableFuture performRequest ) { final ApacheHttpClient5Options requestOptions = (options == null) ? transportOptions : ApacheHttpClient5Options.of(options); - final CompletableFuture future = new CompletableFuture<>(); + final CompletableFuture future = new OpenSearchRequestFuture<>(); final HttpUriRequestBase clientReq = prepareLowLevelRequest(request, endpoint, requestOptions); final WarningsHandler warningsHandler = (requestOptions.getWarningsHandler() == null) ? this.warningsHandler : requestOptions.getWarningsHandler(); + final long executionStartTime = System.currentTimeMillis(); + try { performRequestAsync(nextNodes(), requestOptions, clientReq, warningsHandler, future); } catch (final IOException ex) { @@ -193,6 +233,15 @@ public CompletableFuture performRequest } catch (final IOException ex) { throw new CompletionException(ex); } + }).whenComplete((responseT, throwable) -> { + RequestMetricContext context = ((OpenSearchRequestFuture) future).getContext(); + if (throwable != null) { + context.setThrowable(throwable); + } + context.setRequestExecutionTime(Duration.ofMillis(System.currentTimeMillis() - executionStartTime)); + if (isMetricsEnabled) { + TelemetryMetricsManager.recordRequestMetrics(request.getClass().getSimpleName(), meterOptions, context, metricGroups); + } }); } @@ -206,9 +255,24 @@ public TransportOptions options() { return transportOptions; } + public boolean isMetricsEnabled() { + return isMetricsEnabled; + } + + public String getClientID() { + return clientID; + } + + protected MeterOptions getMeterOptions() { + return meterOptions; + } + @Override public void close() throws IOException { client.close(); + if (meterRegistry != null) { + TelemetryMetricsManager.removeRegistry(meterRegistry); + } } private void performRequestAsync( @@ -219,6 +283,8 @@ private void performRequestAsync( final CompletableFuture listener ) { final RequestContext context = createContextForNextAttempt(options, request, nodeTuple.nodes.next(), nodeTuple.authCache); + final String hostName = context.node.getHost().getHostName() + ":" + context.node.getHost().getPort(); + final long startTime = System.currentTimeMillis(); Future future = client.execute( context.requestProducer, context.asyncResponseConsumer, @@ -226,7 +292,15 @@ private void performRequestAsync( new FutureCallback() { @Override public void completed(ClassicHttpResponse httpResponse) { + long endTime = System.currentTimeMillis(); try { + prepareRequestMetricContext( + ((OpenSearchRequestFuture) listener).getContext(), + request, + httpResponse, + hostName, + endTime - startTime + ); ResponseOrResponseException responseOrResponseException = convertResponse( request, context.node, @@ -249,7 +323,17 @@ public void completed(ClassicHttpResponse httpResponse) { @Override public void failed(Exception failure) { + long endTime = System.currentTimeMillis(); try { + ((OpenSearchRequestFuture) listener).getContext() + .addNetworkRequestContext( + new NetworkRequestMetricContext( + hostName, + failure, + DEFAULT_EMPTY_STATUS_CODE, + Duration.ofMillis(endTime - startTime) + ) + ); onFailure(context.node); if (nodeTuple.nodes.hasNext()) { performRequestAsync(nodeTuple, options, request, warningsHandler, listener); @@ -263,7 +347,17 @@ public void failed(Exception failure) { @Override public void cancelled() { - listener.completeExceptionally(new CancellationException("request was cancelled")); + CancellationException cancellationException = new CancellationException("request was cancelled"); + ((OpenSearchRequestFuture) listener).getContext() + .addNetworkRequestContext( + new NetworkRequestMetricContext( + hostName, + cancellationException, + DEFAULT_EMPTY_STATUS_CODE, + Duration.ofMillis(System.currentTimeMillis() - startTime) + ) + ); + listener.completeExceptionally(cancellationException); } } ); @@ -273,6 +367,30 @@ public void cancelled() { } } + private void prepareRequestMetricContext( + RequestMetricContext requestMetricContext, + HttpUriRequestBase request, + ClassicHttpResponse httpResponse, + String hostName, + long executionTimeMS + ) { + NetworkRequestMetricContext networkRequestMetricContext = new NetworkRequestMetricContext( + hostName, + null, + httpResponse.getCode(), + Duration.ofMillis(executionTimeMS) + ); + // Because we compress data as it is being streamed, we can't obtain compressed size efficiently + if (request.getEntity() != null && !compressionEnabled) { + networkRequestMetricContext.setRequestPayloadSize(request.getEntity().getContentLength()); + } + if (httpResponse.getEntity() != null) { + networkRequestMetricContext.setResponsePayloadSize(httpResponse.getEntity().getContentLength()); + } + requestMetricContext.setStatusCode(httpResponse.getCode()); + requestMetricContext.addNetworkRequestContext(networkRequestMetricContext); + } + /** * Replaces the nodes with which the client communicates. * @@ -824,6 +942,33 @@ private static URI buildUri(String pathPrefix, String path, Map } } + private Tags getCommonTags() { + return Tags.of(CLIENT_ID.toString(), getClientID()); + } + + private Supplier getActiveNodeGaugeUpdater() { + return () -> { + int numActive = 0; + for (Node node : this.nodeTuple.nodes) { + if (!denylist.containsKey(node.getHost())) { + numActive += 1; + } + } + return numActive; + }; + } + + private Supplier getInactiveNodeGaugeUpdater() { + return () -> { + int numInactive = 0; + for (Node node : this.nodeTuple.nodes) { + if (denylist.containsKey(node.getHost())) { + numInactive += 1; + } + } + return numInactive; + }; + } private static class RequestContext { private final Node node; private final AsyncRequestProducer requestProducer; diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportBuilder.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportBuilder.java index 334a48a89..05946acf7 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportBuilder.java +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportBuilder.java @@ -39,6 +39,7 @@ import org.opensearch.client.json.JsonpMapper; import org.opensearch.client.json.jackson.JacksonJsonpMapper; import org.opensearch.client.transport.TransportOptions; +import org.opensearch.client.transport.client_metrics.MetricOptions; import org.opensearch.client.transport.httpclient5.internal.Node; import org.opensearch.client.transport.httpclient5.internal.NodeSelector; @@ -77,6 +78,7 @@ public class ApacheHttpClient5TransportBuilder { private Optional chunkedEnabled; private JsonpMapper mapper; private TransportOptions options; + private MetricOptions metricOptions; /** * Creates a new builder instance and sets the hosts that the client will send requests to. @@ -261,6 +263,11 @@ public ApacheHttpClient5TransportBuilder setChunkedEnabled(boolean chunkedEnable return this; } + public ApacheHttpClient5TransportBuilder setMetricOptions(MetricOptions metricOptions) { + this.metricOptions = metricOptions; + return this; + } + /** * Creates a new {@link RestClient} based on the provided configuration. */ @@ -287,7 +294,8 @@ public ApacheHttpClient5Transport build() { nodeSelector, strictDeprecationMode, compressionEnabled, - chunkedEnabled.orElse(false) + chunkedEnabled.orElse(false), + metricOptions ); httpClient.start(); diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/OpenSearchRequestFuture.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/OpenSearchRequestFuture.java new file mode 100644 index 000000000..49b2f1ea6 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/OpenSearchRequestFuture.java @@ -0,0 +1,12 @@ +package org.opensearch.client.transport.httpclient5; + +import java.util.concurrent.CompletableFuture; +import org.opensearch.client.transport.client_metrics.RequestMetricContext; + +public class OpenSearchRequestFuture extends CompletableFuture { + private final RequestMetricContext context = new RequestMetricContext(); + + public RequestMetricContext getContext() { + return context; + } +} \ No newline at end of file diff --git a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/AbstractClientMetricsIT.java b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/AbstractClientMetricsIT.java new file mode 100644 index 000000000..4243e6f9b --- /dev/null +++ b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/AbstractClientMetricsIT.java @@ -0,0 +1,466 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.opensearch.integTest; + +import static org.awaitility.Awaitility.await; + +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.Timer; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.hc.client5.http.HttpHostConnectException; +import org.apache.http.HttpHost; +import org.opensearch.client.opensearch.OpenSearchAsyncClient; +import org.opensearch.client.opensearch._types.OpenSearchException; +import org.opensearch.client.opensearch._types.Refresh; +import org.opensearch.client.opensearch.cluster.HealthRequest; +import org.opensearch.client.opensearch.cluster.HealthResponse; +import org.opensearch.client.opensearch.core.CreateRequest; +import org.opensearch.client.opensearch.core.IndexRequest; +import org.opensearch.client.opensearch.core.IndexResponse; +import org.opensearch.client.opensearch.core.UpdateRequest; +import org.opensearch.client.opensearch.core.UpdateResponse; +import org.opensearch.client.opensearch.integTest.AbstractCrudIT.AppData; +import org.opensearch.client.transport.client_metrics.MetricGroup; +import org.opensearch.client.transport.client_metrics.MetricName; +import org.opensearch.client.transport.client_metrics.MetricTag; +import org.opensearch.client.transport.client_metrics.TelemetryMetricsManager; +import org.opensearch.client.transport.httpclient5.ResponseException; +import org.opensearch.common.settings.Settings; + +public abstract class AbstractClientMetricsIT extends OpenSearchJavaClientTestCase { + private static final HttpHost[] BAD_HOSTS = new HttpHost[] { new HttpHost("localhost", 9201), new HttpHost("localhost", 9202) }; + private static final List ALL_GROUPS = MetricGroup.ALL.stream().map(Enum::toString).collect(Collectors.toList()); + private static final Settings ALL_METRIC_GROUP_SETTING = Settings.builder().putList(METRICS_GROUPS, ALL_GROUPS).build(); + + public void testDefaultMetricGroup() throws IOException { + TelemetryMetricsManager.addRegistry(getStubRegistry()); + Future clusterHealthFuture = getCustomAsyncClient(getDefaultHosts(), restClientSettingsWithMetrics(Settings.EMPTY)) + .cluster() + .health(); + await().atMost(2, TimeUnit.SECONDS).until(clusterHealthFuture::isDone); + assertEquals( + 1, + findMeter( + getStubRegistry(), + MetricName.REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), HealthRequest.class.getSimpleName()) + ).size() + ); + assertEquals(1, findMeter(getStubRegistry(), MetricName.ACTIVE_NODES.toString(), Tags.empty()).size()); + assertEquals(1, findMeter(getStubRegistry(), MetricName.INACTIVE_NODES.toString(), Tags.empty()).size()); + } + + public void testAllMetricGroups() throws IOException { + TelemetryMetricsManager.addRegistry(getStubRegistry()); + String index = "index_metrics"; + String id = UUID.randomUUID().toString(); + AppData appData = new AppData(); + appData.setMsg("testMetrics"); + Future indexFuture = getCustomAsyncClient(getDefaultHosts(), restClientSettingsWithMetrics(ALL_METRIC_GROUP_SETTING)) + .index(b -> b.index(index).id(id).document(appData).refresh(Refresh.True)); + await().atMost(2, TimeUnit.SECONDS).until(indexFuture::isDone); + + assertEquals( + 1, + findMeter( + getStubRegistry(), + MetricName.REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ).size() + ); + assertEquals(1, findMeter(getStubRegistry(), MetricName.ACTIVE_NODES.toString(), Tags.empty()).size()); + assertEquals(1, findMeter(getStubRegistry(), MetricName.INACTIVE_NODES.toString(), Tags.empty()).size()); + assertEquals( + 1, + findMeter( + getStubRegistry(), + MetricName.NETWORK_REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ).size() + ); + assertEquals( + 1, + findMeter( + getStubRegistry(), + MetricName.RESPONSE_PAYLOAD_SIZE.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ).size() + ); + assertEquals( + 1, + findMeter( + getStubRegistry(), + MetricName.REQUEST_PAYLOAD_SIZE.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ).size() + ); + } + + public void testRequestMetricResponse() throws IOException { + TelemetryMetricsManager.addRegistry(getStubRegistry()); + Future clusterHealthFuture = getCustomAsyncClient( + getDefaultHosts(), + restClientSettingsWithMetrics(ALL_METRIC_GROUP_SETTING) + ).cluster().health(); + await().atMost(2, TimeUnit.SECONDS).until(clusterHealthFuture::isDone); + List meter = findMeter( + getStubRegistry(), + MetricName.REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), HealthRequest.class.getSimpleName()) + ); + assertEquals(1, meter.size()); + Meter clusterHealthMeter = meter.get(0); + + meter = findMeter( + getStubRegistry(), + MetricName.NETWORK_REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), HealthRequest.class.getSimpleName()) + ); + assertEquals(1, meter.size()); + Meter clusterHealthNetworkMeter = meter.get(0); + + for (Meter requestMeter : Arrays.asList(clusterHealthMeter, clusterHealthNetworkMeter)) { + String statusCodeTag = requestMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString()); + assertEquals(1, ((Timer) requestMeter).count()); + assertEquals("200", statusCodeTag); + } + + String[] hostsContacted = clusterHealthMeter.getId().getTag(MetricTag.HOST_CONTACTED.toString()).split(","); + assertArrayEquals(new String[] { getTestRestCluster() }, hostsContacted); + String hostContacted = clusterHealthNetworkMeter.getId().getTag(MetricTag.HOST.toString()); + assertEquals(getTestRestCluster(), hostContacted); + } + + public void testRequestMetricResponseExceptionThrown() throws IOException, ExecutionException, InterruptedException { + TelemetryMetricsManager.addRegistry(getStubRegistry()); + String index = "index_metrics"; + OpenSearchAsyncClient client = getCustomAsyncClient(getDefaultHosts(), restClientSettingsWithMetrics(ALL_METRIC_GROUP_SETTING)); + // Force 409 + String id = UUID.randomUUID().toString(); + AppData appData = new AppData(); + appData.setMsg("testRequestMetricResponseException"); + client.index(b -> b.index(index).id(id).document(appData).refresh(Refresh.True)).get(); + try { + CreateRequest duplicateCreateRequest = new CreateRequest.Builder().index(index) + .id(id) + .document(appData) + .build(); + client.create(duplicateCreateRequest).get(); + fail("Should have failed due to version conflict"); + } catch (ExecutionException e) { + if (e.getCause() instanceof ResponseException) { + assertTrue(e.getMessage().contains("version conflict")); + } else { + fail("Unrecognized cause: " + e.getCause()); + } + } + // Check metrics + List meter = findMeter( + getStubRegistry(), + MetricName.REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), CreateRequest.class.getSimpleName()) + ); + assertEquals(1, meter.size()); + Timer createMeter = (Timer) meter.get(0); + + meter = findMeter( + getStubRegistry(), + MetricName.NETWORK_REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), CreateRequest.class.getSimpleName()) + ); + assertEquals(1, meter.size()); + Timer createNetworkMeter = (Timer) meter.get(0); + + for (Timer requestMeter : Arrays.asList(createMeter, createNetworkMeter)) { + String statusCodeTag = requestMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString()); + assertEquals("409", statusCodeTag); + assertEquals(1, requestMeter.count()); + } + + String[] hostsContacted = createMeter.getId().getTag(MetricTag.HOST_CONTACTED.toString()).split(","); + assertArrayEquals(new String[] { getTestRestCluster() }, hostsContacted); + + String hostContacted = createNetworkMeter.getId().getTag(MetricTag.HOST.toString()); + assertEquals(getTestRestCluster(), hostContacted); + } + + public void testRequestMetricResponseExceptionHandled() throws IOException, InterruptedException { + TelemetryMetricsManager.addRegistry(getStubRegistry()); + String malformedIndexName = "index_Metrics"; + // Force 400 for SimpleEndpoint + String id = UUID.randomUUID().toString(); + AppData appData = new AppData(); + appData.setMsg("testRequestMetricResponseExceptionHandled"); + try { + getCustomAsyncClient(getDefaultHosts(), restClientSettingsWithMetrics(ALL_METRIC_GROUP_SETTING)).index( + b -> b.index(malformedIndexName).id(id).document(appData).refresh(Refresh.True) + ).get(); + fail("Should have failed due to invalid index name"); + } catch (ExecutionException e) { + if (e.getCause() instanceof OpenSearchException) { + assertTrue(e.getMessage().contains("invalid_index_name_exception")); + } else { + fail("Unrecognized cause: " + e.getCause()); + } + } + // Check metrics + List meter = findMeter( + getStubRegistry(), + MetricName.REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ); + assertEquals(1, meter.size()); + Timer indexMeter = (Timer) meter.get(0); + + meter = findMeter( + getStubRegistry(), + MetricName.NETWORK_REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ); + assertEquals(1, meter.size()); + Timer indexNetworkMeter = (Timer) meter.get(0); + + for (Timer requestMeter : Arrays.asList(indexMeter, indexNetworkMeter)) { + String statusCodeTag = requestMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString()); + assertEquals("400", statusCodeTag); + assertEquals(1, requestMeter.count()); + } + + String[] hostsContacted = indexMeter.getId().getTag(MetricTag.HOST_CONTACTED.toString()).split(","); + assertArrayEquals(new String[] { getTestRestCluster() }, hostsContacted); + + String hostContacted = indexNetworkMeter.getId().getTag(MetricTag.HOST.toString()); + assertEquals(getTestRestCluster(), hostContacted); + } + + public void testRequestMetricExceptionThrown() throws IOException, InterruptedException { + TelemetryMetricsManager.addRegistry(getStubRegistry()); + String indexName = "index_metrics"; + // Force HttpHostConnectException + String id = UUID.randomUUID().toString(); + AppData appData = new AppData(); + appData.setMsg("testRequestMetricClientExceptionThrown"); + + try { + getCustomAsyncClient(BAD_HOSTS, restClientSettingsWithMetrics(ALL_METRIC_GROUP_SETTING)).index( + b -> b.index(indexName).id(id).document(appData).refresh(Refresh.True) + ).get(); + fail("Should have failed due to bad host"); + } catch (ExecutionException e) { + System.out.println("Caught " + e.getMessage()); + if (!(e.getCause() instanceof HttpHostConnectException)) { + fail("Recognized cause: " + e.getCause()); + } + } + // Check metric + List meter = findMeter( + getStubRegistry(), + MetricName.REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ); + assertEquals(1, meter.size()); + Timer indexMeter = (Timer) meter.get(0); + + meter = findMeter( + getStubRegistry(), + MetricName.NETWORK_REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ); + assertEquals(2, meter.size()); + + for (Timer requestMeter : Arrays.asList(indexMeter, (Timer) meter.get(0), (Timer) meter.get(1))) { + String statusCodeTag = requestMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString()); + assertEquals("ClientError-HttpHostConnectException", statusCodeTag); + assertEquals(1, requestMeter.count()); + } + + String[] hostsContacted = indexMeter.getId().getTag(MetricTag.HOST_CONTACTED.toString()).split(","); + String[] expectedHosts = new String[] { "localhost:9201", "localhost:9202" }; + assertArrayEquals(expectedHosts, hostsContacted); + + List networkContactedHosts = new ArrayList<>(List.of(expectedHosts)); + networkContactedHosts.remove(meter.get(0).getId().getTag(MetricTag.HOST.toString())); + networkContactedHosts.remove(meter.get(1).getId().getTag(MetricTag.HOST.toString())); + assertTrue(networkContactedHosts.isEmpty()); + } + + public void testNodeGauges() throws InterruptedException, IOException { + TelemetryMetricsManager.addRegistry(getStubRegistry()); + String clientID = "badHostClient"; + OpenSearchAsyncClient client = getCustomAsyncClient( + BAD_HOSTS, + restClientSettingsWithMetrics(Settings.builder().put(CUSTOM_CLIENT_ID, clientID).put(ALL_METRIC_GROUP_SETTING).build()) + ); + // Verify active nodes + List activeNodeMeter = findMeter( + getStubRegistry(), + MetricName.ACTIVE_NODES.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID) + ); + assertEquals(1, activeNodeMeter.size()); + assertEquals(2, ((Gauge) activeNodeMeter.get(0)).value(), 0); + // Verify inactive nodes + List inactiveNodeMeter = findMeter( + getStubRegistry(), + MetricName.INACTIVE_NODES.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID) + ); + assertEquals(1, inactiveNodeMeter.size()); + assertEquals(0, ((Gauge) inactiveNodeMeter.get(0)).value(), 0); + + // Force HttpHostConnectException + String indexName = "index_metrics"; + String id = UUID.randomUUID().toString(); + AppData appData = new AppData(); + appData.setMsg("testRequestMetricClientExceptionThrown"); + try { + client.index(b -> b.index(indexName).id(id).document(appData).refresh(Refresh.True)).get(); + fail("Should have failed due to bad host"); + } catch (ExecutionException | IOException e) { + System.out.println("Caught " + e.getMessage()); + if (!(e.getCause() instanceof HttpHostConnectException)) { + fail("Recognized cause: " + e.getCause()); + } + } + + // Verify active nodes + activeNodeMeter = findMeter( + getStubRegistry(), + MetricName.ACTIVE_NODES.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID) + ); + assertEquals(1, activeNodeMeter.size()); + assertEquals(0, ((Gauge) activeNodeMeter.get(0)).value(), 0); + // Verify inactive nodes + inactiveNodeMeter = findMeter( + getStubRegistry(), + MetricName.INACTIVE_NODES.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID) + ); + assertEquals(1, inactiveNodeMeter.size()); + assertEquals(2, ((Gauge) inactiveNodeMeter.get(0)).value(), 0); + } + + public void testPayloadSize() throws IOException { + TelemetryMetricsManager.addRegistry(getStubRegistry()); + String clientID = "compressingClient"; + OpenSearchAsyncClient client = getCustomAsyncClient( + Stream.concat(Arrays.stream(BAD_HOSTS), Arrays.stream(getDefaultHosts())).toArray(HttpHost[]::new), + restClientSettingsWithMetrics(Settings.builder().put(CUSTOM_CLIENT_ID, clientID).put(ALL_METRIC_GROUP_SETTING).build()) + ); + String index = "index_metrics"; + String id = UUID.randomUUID().toString(); + + // Index + AppData appData = new AppData(); + appData.setMsg(generatePayload(500)); + Future indexFuture = client.index(b -> b.index(index).id(id).document(appData).refresh(Refresh.True)); + await().atMost(Duration.ofSeconds(2)).until(indexFuture::isDone); + DistributionSummary indexRequestPayload = (DistributionSummary) findMeter( + getStubRegistry(), + MetricName.REQUEST_PAYLOAD_SIZE.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID, MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ).get(0); + assertTrue(500 <= indexRequestPayload.max() && indexRequestPayload.max() <= 600); + assertEquals(1, indexRequestPayload.count()); + DistributionSummary indexResponsePayload = (DistributionSummary) findMeter( + getStubRegistry(), + MetricName.RESPONSE_PAYLOAD_SIZE.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID, MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ).get(0); + assertEquals(1, indexResponsePayload.count()); + + // Update + appData.setMsg(generatePayload(5 * 1024)); + CompletableFuture> updateFuture = client.update(b -> b.doc(appData).index(index).id(id), AppData.class); + await().atMost(Duration.ofSeconds(2)).until(updateFuture::isDone); + DistributionSummary updateRequestPayload = (DistributionSummary) findMeter( + getStubRegistry(), + MetricName.REQUEST_PAYLOAD_SIZE.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID, MetricTag.REQUEST.toString(), UpdateRequest.class.getSimpleName()) + ).get(0); + assertTrue(5 * 1024 <= updateRequestPayload.max() && updateRequestPayload.max() <= (5 * 1024 + 100)); + assertEquals(1, updateRequestPayload.count()); + DistributionSummary updateResponsePayload = (DistributionSummary) findMeter( + getStubRegistry(), + MetricName.RESPONSE_PAYLOAD_SIZE.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID, MetricTag.REQUEST.toString(), UpdateRequest.class.getSimpleName()) + ).get(0); + assertTrue(updateResponsePayload.max() > 0); + assertEquals(1, updateResponsePayload.count()); + } + + public void testNoRequestPayloadSizeCompressionEnabled() throws IOException { + TelemetryMetricsManager.addRegistry(getStubRegistry()); + String clientID = "compressingClient"; + OpenSearchAsyncClient client = getCustomAsyncClient( + getDefaultHosts(), + restClientSettingsWithMetrics( + Settings.builder().put(CUSTOM_CLIENT_ID, clientID).put(ALL_METRIC_GROUP_SETTING).put(COMPRESSION_ENABLED, true).build() + ) + ); + String index = "index_metrics"; + String id = UUID.randomUUID().toString(); + AppData appData = new AppData(); + appData.setMsg(generatePayload(5 * 1024)); + Future indexFuture = client.index(b -> b.index(index).id(id).document(appData).refresh(Refresh.True)); + await().atMost(Duration.ofSeconds(2)).until(indexFuture::isDone); + assertTrue( + findMeter( + getStubRegistry(), + MetricName.REQUEST_PAYLOAD_SIZE.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID, MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ).isEmpty() + ); + } + + private List findMeter(MeterRegistry registry, String meterName, Tags tags) { + return registry.getMeters().stream().filter(meter -> { + if (meter.getId().getName().equals(meterName)) { + if (tags != null && !tags.equals(Tags.empty())) { + for (Tag tag : tags) { + String tagValue = meter.getId().getTag(tag.getKey()); + if (tagValue == null || !tagValue.equalsIgnoreCase(tag.getValue())) { + return false; + } + } + } + return true; + } + return false; + }).collect(Collectors.toList()); + } + + private String generatePayload(int numBytes) { + if (numBytes < 0) { + throw new IllegalArgumentException("Negative payload size"); + } + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < numBytes; i++) { + builder.append("o"); + } + return builder.toString(); + } +} \ No newline at end of file diff --git a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/OpenSearchJavaClientTestCase.java b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/OpenSearchJavaClientTestCase.java index e4825420a..037ae201d 100644 --- a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/OpenSearchJavaClientTestCase.java +++ b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/OpenSearchJavaClientTestCase.java @@ -8,6 +8,8 @@ package org.opensearch.client.opensearch.integTest; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -34,6 +36,7 @@ import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.client.opensearch.IOUtils; +import org.opensearch.client.opensearch.OpenSearchAsyncClient; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.opensearch._types.ExpandWildcard; import org.opensearch.client.opensearch.cat.IndicesResponse; @@ -42,6 +45,7 @@ import org.opensearch.client.opensearch.indices.DeleteIndexRequest; import org.opensearch.client.opensearch.nodes.NodesInfoResponse; import org.opensearch.client.opensearch.nodes.info.NodeInfo; +import org.opensearch.client.transport.client_metrics.TelemetryMetricsManager; import org.opensearch.common.settings.Settings; import org.opensearch.test.rest.OpenSearchRestTestCase; @@ -53,12 +57,20 @@ public abstract class OpenSearchJavaClientTestCase extends OpenSearchRestTestCas ".plugins-ml-model-group", ".ql-datasources" ); + + public static String METRICS_ENABLED = "metrics.enabled"; + public static String CUSTOM_CLIENT_ID = "custom.client.id"; + public static String METRICS_GROUPS = "metrics.groups"; + public static String COMPRESSION_ENABLED = "compression.enabled"; + private static final List customAsyncClients = new ArrayList<>(); private static OpenSearchClient javaClient; private static OpenSearchClient adminJavaClient; private static TreeSet nodeVersions; private static List clusterHosts; + private MeterRegistry stubRegistry = new SimpleMeterRegistry(); + @Before public void initJavaClient() throws IOException { if (javaClient == null) { @@ -88,6 +100,21 @@ public void initJavaClient() throws IOException { } } + protected HttpHost[] getDefaultHosts() { + if (clusterHosts != null) { + return clusterHosts.toArray(new HttpHost[clusterHosts.size()]); + } + return new HttpHost[0]; + } + + protected Settings restClientSettingsWithMetrics(Settings additionalMetricsSettings) { + Settings defaultSettings = Settings.builder().put(restClientSettings()).put(METRICS_ENABLED, true).build(); + if (additionalMetricsSettings == null || additionalMetricsSettings.isEmpty()) { + return defaultSettings; + } + return Settings.builder().put(defaultSettings).put(additionalMetricsSettings).build(); + } + @Override protected String getProtocol() { return isHttps() ? "https" : "http"; @@ -147,6 +174,11 @@ protected static OpenSearchClient adminJavaClient() { return adminJavaClient; } + protected synchronized OpenSearchAsyncClient getCustomAsyncClient(HttpHost[] hosts, Settings clientSettings) throws IOException { + OpenSearchAsyncClient customAsyncClient = buildAsyncJavaClient(clientSettings, hosts); + customAsyncClients.add(customAsyncClient); + return customAsyncClient; + } protected String getTestRestCluster() { String cluster = System.getProperty("tests.rest.cluster"); if (cluster == null) { @@ -155,6 +187,9 @@ protected String getTestRestCluster() { return cluster; } + public MeterRegistry getStubRegistry() { + return stubRegistry; + } @After protected void wipeAllOSIndices() throws IOException { // wipe all data streams first, otherwise deleting backing indices will encounter exception @@ -169,6 +204,8 @@ protected void wipeAllOSIndices() throws IOException { adminJavaClient().indices().delete(new DeleteIndexRequest.Builder().index(index.index()).build()); } } + TelemetryMetricsManager.removeRegistry(stubRegistry); + cleanUpCustomAsyncClients(); } @AfterClass @@ -194,6 +231,16 @@ protected boolean preserveIndicesUponCompletion() { return true; } + private synchronized void cleanUpCustomAsyncClients() { + if (!customAsyncClients.isEmpty()) { + for (OpenSearchAsyncClient client : customAsyncClients) { + try { + IOUtils.closeQueitly(client._transport()); + } catch (Exception ignored) {} + } + customAsyncClients.clear(); + } + } protected Version getServerVersion() throws IOException { final InfoResponse info = javaClient().info(); diff --git a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/OpenSearchTransportSupport.java b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/OpenSearchTransportSupport.java index 22b6db5de..8d52bb5a4 100644 --- a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/OpenSearchTransportSupport.java +++ b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/OpenSearchTransportSupport.java @@ -11,6 +11,7 @@ import java.io.IOException; import java.util.Optional; import org.apache.hc.core5.http.HttpHost; +import org.opensearch.client.opensearch.OpenSearchAsyncClient; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.transport.OpenSearchTransport; import org.opensearch.common.settings.Settings; @@ -24,5 +25,9 @@ default OpenSearchClient buildJavaClient(Settings settings, HttpHost[] hosts) th return new OpenSearchClient(buildTransport(settings, hosts)); } + default OpenSearchAsyncClient buildAsyncJavaClient(Settings settings, HttpHost[] hosts) throws IOException { + return new OpenSearchAsyncClient(buildTransport(settings, hosts)); + } + OpenSearchTransport buildTransport(Settings settings, HttpHost[] hosts) throws IOException; } diff --git a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/httpclient5/ApacheHttpClient5TransportTest.java b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/httpclient5/ApacheHttpClient5TransportTest.java new file mode 100644 index 000000000..a3e70dc6b --- /dev/null +++ b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/httpclient5/ApacheHttpClient5TransportTest.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.opensearch.integTest.httpclient5; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import java.io.IOException; +import java.util.Optional; +import org.apache.hc.core5.http.HttpHost; +import org.junit.Test; +import org.opensearch.client.transport.client_metrics.MetricOptions; +import org.opensearch.client.transport.client_metrics.MetricTag; +import org.opensearch.client.transport.httpclient5.ApacheHttpClient5Transport; +import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder; + +public class ApacheHttpClient5TransportTest { + @Test + public void testInitWithMetricsOptions() throws IOException { + MeterRegistry meterRegistry = new SimpleMeterRegistry(); + String clientID = "testClient"; + HttpHost host = new HttpHost("localhost", 9200); + MetricOptions metricOptions = MetricOptions.builder() + .setMetricsEnabled(true) + .setMeterRegistry(meterRegistry) + .setClientId(clientID) + .setPercentiles(0.90, 0.8, 0.5) + .build(); + ApacheHttpClient5TransportBuilder builder = ApacheHttpClient5TransportBuilder.builder(host); + builder.setMetricOptions(metricOptions); + try (ApacheHttpClient5Transport transport = builder.build()) { + assertTrue(transport.isMetricsEnabled()); + assertArrayEquals(new double[] { 0.90, 0.8, 0.5 }, transport.getMeterOptions().getPercentiles(), 0); + Optional clientIDTag = transport.getMeterOptions() + .getCommonTags() + .stream() + .filter(tag -> tag.getKey().equals(MetricTag.CLIENT_ID.toString())) + .findFirst(); + assertFalse(clientIDTag.isEmpty()); + assertEquals(clientID, clientIDTag.get().getValue()); + assertEquals(clientID, transport.getClientID()); + } + } +} \ No newline at end of file diff --git a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/httpclient5/HttpClient5TransportSupport.java b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/httpclient5/HttpClient5TransportSupport.java index 617304f83..bcfbe7afb 100644 --- a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/httpclient5/HttpClient5TransportSupport.java +++ b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/httpclient5/HttpClient5TransportSupport.java @@ -8,13 +8,19 @@ package org.opensearch.client.opensearch.integTest.httpclient5; +import static org.opensearch.client.opensearch.integTest.OpenSearchJavaClientTestCase.COMPRESSION_ENABLED; +import static org.opensearch.client.opensearch.integTest.OpenSearchJavaClientTestCase.CUSTOM_CLIENT_ID; +import static org.opensearch.client.opensearch.integTest.OpenSearchJavaClientTestCase.METRICS_ENABLED; +import static org.opensearch.client.opensearch.integTest.OpenSearchJavaClientTestCase.METRICS_GROUPS; import static org.opensearch.test.rest.OpenSearchRestTestCase.CLIENT_PATH_PREFIX; import static org.opensearch.test.rest.OpenSearchRestTestCase.CLIENT_SOCKET_TIMEOUT; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import java.io.IOException; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; +import java.util.List; import java.util.Map; import java.util.Optional; import javax.net.ssl.SSLContext; @@ -36,6 +42,8 @@ import org.apache.hc.core5.util.Timeout; import org.opensearch.client.opensearch.integTest.OpenSearchTransportSupport; import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.client_metrics.MetricGroup; +import org.opensearch.client.transport.client_metrics.MetricOptions; import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -108,5 +116,26 @@ public TlsDetails create(final SSLEngine sslEngine) { if (settings.hasValue(CLIENT_PATH_PREFIX)) { builder.setPathPrefix(settings.get(CLIENT_PATH_PREFIX)); } + if (settings.hasValue(METRICS_ENABLED) && settings.getAsBoolean(METRICS_ENABLED, false)) { + MetricOptions.MetricOptionsBuilder metricOptionsBuilder = MetricOptions.builder() + .setMeterRegistry(new SimpleMeterRegistry()) + .setPercentiles(0.95) + .setMetricsEnabled(true); + if (settings.hasValue(CUSTOM_CLIENT_ID)) { + metricOptionsBuilder.setClientId(settings.get(CUSTOM_CLIENT_ID)); + } + if (settings.hasValue(METRICS_GROUPS)) { + List metricGroupStrList = settings.getAsList(METRICS_GROUPS); + MetricGroup[] metricGroups = new MetricGroup[metricGroupStrList.size()]; + for (int j = 0; j < metricGroupStrList.size(); j++) { + metricGroups[j] = MetricGroup.valueOf(metricGroupStrList.get(j)); + } + metricOptionsBuilder.setAdditionalMetricGroups(metricGroups); + } + builder.setMetricOptions(metricOptionsBuilder.build()); + } + if (settings.hasValue(COMPRESSION_ENABLED)) { + builder.setCompressionEnabled(settings.getAsBoolean(COMPRESSION_ENABLED, false)); + } } } diff --git a/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/MeterOptionsTest.java b/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/MeterOptionsTest.java new file mode 100644 index 000000000..38868fff9 --- /dev/null +++ b/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/MeterOptionsTest.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.client_metrics; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import io.micrometer.core.instrument.Tags; +import java.util.EnumSet; +import org.junit.Test; + +public class MeterOptionsTest { + @Test + public void testMeterOptions() { + MeterOptions meterOptions = new MeterOptions( + new double[] { 0.80, 0.85 }, + Tags.of("test1", "test2"), + EnumSet.of(MetricTag.CLIENT_ID) + ); + assertArrayEquals(new double[] { 0.80, 0.85 }, meterOptions.getPercentiles(), 0); + assertEquals(Tags.of("test1", "test2"), meterOptions.getCommonTags()); + assertEquals(EnumSet.of(MetricTag.CLIENT_ID), meterOptions.getExcludedTagNames()); + } + + @Test + public void testMeterOptionsNoNull() { + MeterOptions meterOptions = new MeterOptions(null, null, null); + assertArrayEquals(MetricConstants.DEFAULT_PERCENTILES, meterOptions.getPercentiles(), 0.0); + assertEquals(Tags.empty(), meterOptions.getCommonTags()); + assertEquals(MetricConstants.DEFAULT_EXCLUDED_TAGS, meterOptions.getExcludedTagNames()); + } +} \ No newline at end of file diff --git a/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/MetricOptionsTest.java b/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/MetricOptionsTest.java new file mode 100644 index 000000000..fa6dedf7f --- /dev/null +++ b/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/MetricOptionsTest.java @@ -0,0 +1,81 @@ + +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.client_metrics; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_ADDITIONAL_METRIC_GROUPS; +import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_EXCLUDED_TAGS; +import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_PERCENTILES; +import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_REGISTRY; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import java.util.EnumSet; +import org.junit.Test; + +public class MetricOptionsTest { + @Test + public void testBuildMetricOptions() { + MeterRegistry meterRegistry = new SimpleMeterRegistry(); + String clientID = "testClient"; + MetricOptions metricOptions = MetricOptions.builder() + .setMetricsEnabled(true) + .setMeterRegistry(meterRegistry) + .setClientId(clientID) + .setPercentiles(0.90, 0.8, 0.5) + .setExcludedTags(MetricTag.HOST_CONTACTED, MetricTag.STATUS_CODE_OR_EXCEPTION) + .setAdditionalMetricGroups(MetricGroup.NETWORK_DETAILS) + .build(); + assertTrue(metricOptions.isMetricsEnabled()); + assertEquals(meterRegistry, metricOptions.getMeterRegistry()); + assertEquals(clientID, metricOptions.getClientId()); + assertArrayEquals(new double[] { 0.90, 0.8, 0.5 }, metricOptions.getPercentiles(), 0); + assertEquals(EnumSet.of(MetricTag.HOST_CONTACTED, MetricTag.STATUS_CODE_OR_EXCEPTION), metricOptions.getExcludedTags()); + assertEquals(EnumSet.of(MetricGroup.NETWORK_DETAILS), metricOptions.getMetricGroups()); + } + + @Test + public void testClientIDAlwaysNotNullOrEmpty() { + MetricOptions metricOptions = MetricOptions.builder().build(); + assertNotNull(metricOptions.getClientId()); + assertFalse(metricOptions.getClientId().isEmpty()); + + metricOptions = MetricOptions.builder().setClientId(null).build(); + assertNotNull(metricOptions.getClientId()); + assertFalse(metricOptions.getClientId().isEmpty()); + + metricOptions = MetricOptions.builder().setClientId("").build(); + assertNotNull(metricOptions.getClientId()); + assertFalse(metricOptions.getClientId().isEmpty()); + } + + @Test + public void testDefaults() { + MetricOptions metricOptions = MetricOptions.builder().build(); + validateDefaults(metricOptions); + + metricOptions = MetricOptions.builder().setMeterRegistry(null).setClientId(null).build(); + validateDefaults(metricOptions); + } + + private void validateDefaults(MetricOptions metricOptions) { + assertFalse(metricOptions.isMetricsEnabled()); + assertEquals(DEFAULT_REGISTRY, metricOptions.getMeterRegistry()); + assertNotNull(metricOptions.getClientId()); + assertFalse(metricOptions.getClientId().isEmpty()); + assertArrayEquals(DEFAULT_PERCENTILES, metricOptions.getPercentiles(), 0); + assertEquals(DEFAULT_EXCLUDED_TAGS, metricOptions.getExcludedTags()); + assertEquals(DEFAULT_ADDITIONAL_METRIC_GROUPS, metricOptions.getMetricGroups()); + } +} \ No newline at end of file diff --git a/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/TelemetryMetricsManagerTest.java b/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/TelemetryMetricsManagerTest.java new file mode 100644 index 000000000..5298c02a2 --- /dev/null +++ b/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/TelemetryMetricsManagerTest.java @@ -0,0 +1,288 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.client_metrics; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import java.io.IOException; +import java.net.http.HttpConnectTimeoutException; +import java.time.Duration; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.opensearch.client.transport.TransportException; + +public class TelemetryMetricsManagerTest { + private MeterRegistry stubRegistry; + + @Before + public void setUp() { + stubRegistry = new SimpleMeterRegistry(); + TelemetryMetricsManager.addRegistry(stubRegistry); + } + + @After + public void cleanUp() { + TelemetryMetricsManager.removeRegistry(stubRegistry); + } + + @Test + public void testRecordingRequestMetric() { + String requestName = "testRequest"; + RequestMetricContext context = new RequestMetricContext(); + Duration totalExecLatency = Duration.ofMillis(100); + Duration errorLatency = Duration.ofMillis(1); + MeterOptions meterOptions = new MeterOptions(new double[] { 0.80, 0.85 }, Tags.of("CommonTag", "CommonTagValue"), null); + context.setRequestExecutionTime(totalExecLatency); + context.setStatusCode(200); + context.addNetworkRequestContext( + new NetworkRequestMetricContext("localhost", new HttpConnectTimeoutException("error"), -1, errorLatency) + ); + context.addNetworkRequestContext(new NetworkRequestMetricContext("localhost2", null, 200, totalExecLatency.minus(errorLatency))); + + TelemetryMetricsManager.recordRequestMetrics(requestName, meterOptions, context, MetricGroup.ALL); + + // Verify REQUEST meter + Optional meter = findMeter(stubRegistry, MetricName.REQUEST.toString(), Tags.of(MetricTag.REQUEST.toString(), requestName)); + assertFalse(meter.isEmpty()); + Timer requestMeter = (Timer) meter.get(); + assertEquals(requestName, requestMeter.getId().getTag(MetricTag.REQUEST.toString())); + assertEquals("200", requestMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString())); + assertEquals("localhost,localhost2", requestMeter.getId().getTag(MetricTag.HOST_CONTACTED.toString())); + assertEquals("CommonTagValue", requestMeter.getId().getTag("CommonTag")); + assertEquals(totalExecLatency.toMillis(), requestMeter.totalTime(TimeUnit.MILLISECONDS), 0); + assertEquals(1, requestMeter.count()); + + // Verify NETWORK_REQUEST meter + List meters = findMeters( + stubRegistry, + MetricName.NETWORK_REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), requestName) + ); + Set hosts = new HashSet<>(Arrays.asList("localhost", "localhost2")); + assertEquals(2, meters.size()); + for (Meter networkRequestMeter : meters) { + assertEquals(requestName, networkRequestMeter.getId().getTag(MetricTag.REQUEST.toString())); + assertEquals("CommonTagValue", networkRequestMeter.getId().getTag("CommonTag")); + assertEquals(1, ((Timer) networkRequestMeter).count()); + + String host = networkRequestMeter.getId().getTag(MetricTag.HOST.toString()); + assertNotNull(host); + hosts.remove(host); + String status = networkRequestMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString()); + if (host.equals("localhost")) { + assertEquals("ClientError-HttpConnectTimeoutException", status); + assertEquals(errorLatency.toMillis(), ((Timer) networkRequestMeter).totalTime(TimeUnit.MILLISECONDS), 0); + } else { + assertEquals("200", status); + assertEquals( + totalExecLatency.minus(errorLatency).toMillis(), + ((Timer) networkRequestMeter).totalTime(TimeUnit.MILLISECONDS), + 0 + ); + } + } + assertTrue("There are unaccounted hosts", hosts.isEmpty()); + } + + @Test + public void testRecordingRequestMetricWithExceptionNoStatus() { + String requestName = "testRequest"; + RequestMetricContext context = new RequestMetricContext(); + MeterOptions meterOptions = new MeterOptions(new double[] { 0.80, 0.85 }, Tags.of("CommonTag", "CommonTagValue"), null); + context.setRequestExecutionTime(Duration.ofMillis(50)); + context.setThrowable(new IOException("IO Errors")); + context.addNetworkRequestContext( + new NetworkRequestMetricContext("localhost", new IOException("IO Errors"), -1, Duration.ofMillis(1)) + ); + + TelemetryMetricsManager.recordRequestMetrics(requestName, meterOptions, context, MetricGroup.ALL); + + // Verify REQUEST meter + Optional meter = findMeter(stubRegistry, MetricName.REQUEST.toString(), Tags.of(MetricTag.REQUEST.toString(), requestName)); + assertFalse(meter.isEmpty()); + Timer requestMeter = (Timer) meter.get(); + assertEquals(requestName, requestMeter.getId().getTag(MetricTag.REQUEST.toString())); + assertEquals("ClientError-IOException", requestMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString())); + assertEquals("localhost", requestMeter.getId().getTag(MetricTag.HOST_CONTACTED.toString())); + assertEquals("CommonTagValue", requestMeter.getId().getTag("CommonTag")); + + // Verify NETWORK_REQUEST meter + meter = findMeter(stubRegistry, MetricName.NETWORK_REQUEST.toString(), Tags.of(MetricTag.REQUEST.toString(), requestName)); + assertFalse(meter.isEmpty()); + Timer networkRequestMeter = (Timer) meter.get(); + assertEquals("localhost", networkRequestMeter.getId().getTag(MetricTag.HOST.toString())); + assertEquals(requestName, networkRequestMeter.getId().getTag(MetricTag.REQUEST.toString())); + assertEquals("ClientError-IOException", networkRequestMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString())); + assertEquals("CommonTagValue", networkRequestMeter.getId().getTag("CommonTag")); + } + + @Test + public void testRecordingRequestMetricWithBothStatusAndException() { + String requestName = "testRequest"; + RequestMetricContext context = new RequestMetricContext(); + MeterOptions meterOptions = new MeterOptions(new double[] { 0.80, 0.85 }, Tags.of("CommonTag", "CommonTagValue"), null); + context.setRequestExecutionTime(Duration.ofMillis(150)); + context.setStatusCode(409); + context.setThrowable(new TransportException("Error")); + context.addNetworkRequestContext( + new NetworkRequestMetricContext("localhost", new TransportException("Error"), 409, Duration.ofMillis(1)) + ); + + TelemetryMetricsManager.recordRequestMetrics(requestName, meterOptions, context, MetricGroup.ALL); + + // Verify REQUEST meter + Optional meter = findMeter(stubRegistry, MetricName.REQUEST.toString(), Tags.of(MetricTag.REQUEST.toString(), requestName)); + assertFalse(meter.isEmpty()); + Timer testMeter = (Timer) meter.get(); + assertEquals(requestName, testMeter.getId().getTag(MetricTag.REQUEST.toString())); + assertEquals("409", testMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString())); + assertEquals("localhost", testMeter.getId().getTag(MetricTag.HOST_CONTACTED.toString())); + assertEquals("CommonTagValue", testMeter.getId().getTag("CommonTag")); + + // Verify NETWORK_REQUEST meter + meter = findMeter(stubRegistry, MetricName.NETWORK_REQUEST.toString(), Tags.of(MetricTag.REQUEST.toString(), requestName)); + assertFalse(meter.isEmpty()); + Timer networkRequestMeter = (Timer) meter.get(); + assertEquals("localhost", networkRequestMeter.getId().getTag(MetricTag.HOST.toString())); + assertEquals(requestName, networkRequestMeter.getId().getTag(MetricTag.REQUEST.toString())); + assertEquals("409", networkRequestMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString())); + assertEquals("CommonTagValue", networkRequestMeter.getId().getTag("CommonTag")); + } + + @Test + public void testRecordingRequestMetricWithTagExclusion() { + String requestName = "testRequest"; + RequestMetricContext context = new RequestMetricContext(); + MeterOptions meterOptions = new MeterOptions( + new double[] { 0.80, 0.85 }, + Tags.of("CommonTag", "CommonTagValue"), + EnumSet.of(MetricTag.HOST_CONTACTED, MetricTag.HOST) + ); + context.setRequestExecutionTime(Duration.ofMillis(150)); + context.setStatusCode(409); + context.setThrowable(new TransportException("Error")); + context.addNetworkRequestContext(new NetworkRequestMetricContext("localhost", null, 409, Duration.ofMillis(1))); + context.addNetworkRequestContext(new NetworkRequestMetricContext("localhost1", null, 409, Duration.ofMillis(1))); + context.addNetworkRequestContext( + new NetworkRequestMetricContext("localhost2", new TransportException("Error"), 409, Duration.ofMillis(1)) + ); + + TelemetryMetricsManager.recordRequestMetrics(requestName, meterOptions, context, MetricGroup.ALL); + + // Verify REQUEST meter + Optional meter = findMeter(stubRegistry, MetricName.REQUEST.toString(), Tags.of(MetricTag.REQUEST.toString(), requestName)); + assertFalse(meter.isEmpty()); + Timer testMeter = (Timer) meter.get(); + assertEquals(requestName, testMeter.getId().getTag(MetricTag.REQUEST.toString())); + assertEquals("409", testMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString())); + assertNull(testMeter.getId().getTag(MetricTag.HOST_CONTACTED.toString())); + assertEquals("CommonTagValue", testMeter.getId().getTag("CommonTag")); + + // Verify NETWORK_REQUEST meter + List meters = findMeters(stubRegistry, MetricName.NETWORK_REQUEST.toString(), Tags.of("CommonTag", "CommonTagValue")); + assertEquals(1, meters.size()); + Timer networkRequestMeter = (Timer) meters.get(0); + assertEquals(requestName, networkRequestMeter.getId().getTag(MetricTag.REQUEST.toString())); + assertEquals("CommonTagValue", networkRequestMeter.getId().getTag("CommonTag")); + assertEquals("409", networkRequestMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString())); + assertNull(networkRequestMeter.getId().getTag(MetricTag.HOST.toString())); + } + + @Test + public void testNoPayLoadSize() { + NetworkRequestMetricContext context = new NetworkRequestMetricContext("temp", null, 0, Duration.ZERO); + TelemetryMetricsManager.recordPayloadSizes(context, Tags.empty()); + assertTrue(findMeter(stubRegistry, MetricName.REQUEST_PAYLOAD_SIZE.toString(), Tags.empty()).isEmpty()); + assertTrue(findMeter(stubRegistry, MetricName.RESPONSE_PAYLOAD_SIZE.toString(), Tags.empty()).isEmpty()); + } + + @Test + public void testMultipleNetworkContextsWithServerResponses() { + String requestName = "testRequest"; + MeterOptions meterOptions = new MeterOptions(new double[] { 0.80, 0.85 }, Tags.of("CommonTag", "CommonTagValue"), null); + + NetworkRequestMetricContext networkRequestMetricContextFail = new NetworkRequestMetricContext( + "host1", + null, + 500, + Duration.ofMillis(10) + ); + networkRequestMetricContextFail.setRequestPayloadSize(500); + + NetworkRequestMetricContext networkRequestMetricContextSuccess = new NetworkRequestMetricContext( + "host1", + null, + 200, + Duration.ofMillis(20) + ); + networkRequestMetricContextSuccess.setRequestPayloadSize(500); + networkRequestMetricContextSuccess.setResponsePayloadSize(200); + + RequestMetricContext requestMetricContext = new RequestMetricContext(); + requestMetricContext.addNetworkRequestContext(networkRequestMetricContextFail); + requestMetricContext.addNetworkRequestContext(networkRequestMetricContextSuccess); + requestMetricContext.setRequestExecutionTime(Duration.ofMillis(30)); + + TelemetryMetricsManager.recordRequestMetrics(requestName, meterOptions, requestMetricContext, MetricGroup.ALL); + + DistributionSummary requestPayloadMeter = (DistributionSummary) findMeter( + stubRegistry, + MetricName.REQUEST_PAYLOAD_SIZE.toString(), + Tags.empty() + ).get(); + assertEquals(2, requestPayloadMeter.count()); + DistributionSummary responsePayloadMeter = (DistributionSummary) findMeter( + stubRegistry, + MetricName.RESPONSE_PAYLOAD_SIZE.toString(), + Tags.empty() + ).get(); + assertEquals(1, responsePayloadMeter.count()); + } + + private List findMeters(MeterRegistry registry, String meterName, Tags tags) { + return registry.getMeters().stream().filter(meter -> { + if (meter.getId().getName().equals(meterName)) { + if (tags != null && !tags.equals(Tags.empty())) { + for (Tag tag : tags) { + String tagValue = meter.getId().getTag(tag.getKey()); + if (tagValue == null || !tagValue.equalsIgnoreCase(tag.getValue())) { + return false; + } + } + } + return true; + } + return false; + }).collect(Collectors.toList()); + } + + private Optional findMeter(MeterRegistry registry, String meterName, Tags tags) { + return findMeters(registry, meterName, tags).stream().findFirst(); + } +} \ No newline at end of file From f722d8481515ae24c80d2c64a369b001b93bc4be Mon Sep 17 00:00:00 2001 From: psingh3 Date: Tue, 20 May 2025 10:23:11 +0530 Subject: [PATCH 03/11] updated the CHANGELOG.md Signed-off-by: psingh3 --- CHANGELOG.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3768a9cc6..113491b58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,11 +9,12 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Bump `com.carrotsearch.randomizedtesting:randomizedtesting-runner` from 2.8.2 to 2.8.3 ([#1487](https://github.com/opensearch-project/opensearch-java/pull/1487)) - Bump `org.apache.httpcomponents.client5:httpclient5` from 5.4.3 to 5.4.4 ([#1544](https://github.com/opensearch-project/opensearch-java/pull/1544)) - Bump `org.opensearch.gradle:build-tools` from 3.0.0-alpha1-SNAPSHOT to 3.1.0-SNAPSHOT ([#1543](https://github.com/opensearch-project/opensearch-java/pull/1543)) - -This section is for maintaining a changelog for all breaking changes for the client that cannot be released in the 2.x line. All other non-breaking changes should be added to [Unreleased 2.x] section. +- Added micrometer dependency `io.micrometer:micrometer-core` version 1.12.3 + This section is for maintaining a changelog for all breaking changes for the client that cannot be released in the 2.x line. All other non-breaking changes should be added to [Unreleased 2.x] section. ### Added - Document HTTP/2 support ([#330](https://github.com/opensearch-project/opensearch-java/pull/330)) +- Metrics support includes micrometer integration and Prometheus support with a custom client-side metrics. ### Dependencies From 4975cc35ae9a9a8d85667425d3280c237cab83f0 Mon Sep 17 00:00:00 2001 From: psingh3 Date: Tue, 20 May 2025 10:42:42 +0530 Subject: [PATCH 04/11] fix file format violations Signed-off-by: psingh3 --- .../ExecutionMetricContext.java | 10 +- .../client_metrics/MeterOptions.java | 10 +- .../client_metrics/MetricConstants.java | 10 +- .../transport/client_metrics/MetricGroup.java | 10 +- .../transport/client_metrics/MetricName.java | 10 +- .../client_metrics/MetricOptions.java | 14 +- .../transport/client_metrics/MetricTag.java | 10 +- .../NetworkRequestMetricContext.java | 10 +- .../client_metrics/RequestMetricContext.java | 10 +- .../TelemetryMetricsManager.java | 122 +++++------ .../ApacheHttpClient5Transport.java | 69 +++--- .../httpclient5/OpenSearchRequestFuture.java | 10 +- .../integTest/AbstractClientMetricsIT.java | 204 +++++++++--------- .../OpenSearchJavaClientTestCase.java | 3 + .../ApacheHttpClient5TransportTest.java | 20 +- .../HttpClient5TransportSupport.java | 6 +- .../client_metrics/MeterOptionsTest.java | 8 +- .../client_metrics/MetricOptionsTest.java | 17 +- .../TelemetryMetricsManagerTest.java | 56 ++--- 19 files changed, 347 insertions(+), 262 deletions(-) diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/ExecutionMetricContext.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/ExecutionMetricContext.java index 41b1805a8..7e128edf3 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/ExecutionMetricContext.java +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/ExecutionMetricContext.java @@ -1,3 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + package org.opensearch.client.transport.client_metrics; import java.time.Duration; @@ -43,4 +51,4 @@ public void setStatusCode(int statusCode) { public void setRequestExecutionTime(Duration executionTime) { this.executionTime = executionTime; } -} \ No newline at end of file +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MeterOptions.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MeterOptions.java index da9f86d5e..ed9ccf03a 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MeterOptions.java +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MeterOptions.java @@ -1,3 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + package org.opensearch.client.transport.client_metrics; import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_EXCLUDED_TAGS; @@ -44,4 +52,4 @@ public Set getExcludedTagNames() { return excludedTagNames; } -} \ No newline at end of file +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricConstants.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricConstants.java index 3020893c4..ac988819c 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricConstants.java +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricConstants.java @@ -1,3 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + package org.opensearch.client.transport.client_metrics; import io.micrometer.core.instrument.MeterRegistry; @@ -10,4 +18,4 @@ public class MetricConstants { public static final MeterRegistry DEFAULT_REGISTRY = Metrics.globalRegistry; public static final Set DEFAULT_EXCLUDED_TAGS = EnumSet.noneOf(MetricTag.class); public static final Set DEFAULT_ADDITIONAL_METRIC_GROUPS = EnumSet.noneOf(MetricGroup.class); -} \ No newline at end of file +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricGroup.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricGroup.java index 108142282..4c8bfa68f 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricGroup.java +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricGroup.java @@ -1,3 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + package org.opensearch.client.transport.client_metrics; import java.util.EnumSet; @@ -9,4 +17,4 @@ public enum MetricGroup { public static final Set REQUIRED_GROUPS = EnumSet.of(GENERAL); public static final Set ALL = EnumSet.allOf(MetricGroup.class); -} \ No newline at end of file +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricName.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricName.java index f6766e0f2..b72d34f83 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricName.java +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricName.java @@ -1,3 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + package org.opensearch.client.transport.client_metrics; public enum MetricName { @@ -19,4 +27,4 @@ public enum MetricName { public String toString() { return PREFIX + "." + metricName; } -} \ No newline at end of file +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricOptions.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricOptions.java index 741fd77c5..2ae33f002 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricOptions.java +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricOptions.java @@ -1,3 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + package org.opensearch.client.transport.client_metrics; import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_ADDITIONAL_METRIC_GROUPS; @@ -24,8 +32,8 @@ public class MetricOptions { public MetricOptions(MetricOptionsBuilder builder) { meterRegistry = builder.meterRegistry == null ? DEFAULT_REGISTRY : builder.meterRegistry; clientId = builder.clientId == null || builder.clientId.isEmpty() - ? String.valueOf(TelemetryMetricsManager.generateClientID()) - : builder.clientId; + ? String.valueOf(TelemetryMetricsManager.generateClientID()) + : builder.clientId; percentiles = builder.percentiles == null ? DEFAULT_PERCENTILES : builder.percentiles; isEnabled = builder.isEnabled; excludedTags = builder.excludedTags == null ? DEFAULT_EXCLUDED_TAGS : builder.excludedTags; @@ -138,4 +146,4 @@ public MetricOptions build() { return new MetricOptions(this); } } -} \ No newline at end of file +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricTag.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricTag.java index 627984752..b857636e6 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricTag.java +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricTag.java @@ -1,3 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + package org.opensearch.client.transport.client_metrics; public enum MetricTag { @@ -17,4 +25,4 @@ public enum MetricTag { public String toString() { return name; } -} \ No newline at end of file +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/NetworkRequestMetricContext.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/NetworkRequestMetricContext.java index 1aeee6822..c8c0d28b8 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/NetworkRequestMetricContext.java +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/NetworkRequestMetricContext.java @@ -1,3 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + package org.opensearch.client.transport.client_metrics; import java.time.Duration; @@ -35,4 +43,4 @@ public long getRequestPayloadSize() { public void setRequestPayloadSize(long requestPayloadSize) { this.requestPayloadSize = requestPayloadSize; } -} \ No newline at end of file +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/RequestMetricContext.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/RequestMetricContext.java index bf8f3c0e0..6d22ec7cd 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/RequestMetricContext.java +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/RequestMetricContext.java @@ -1,3 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + package org.opensearch.client.transport.client_metrics; import java.util.ArrayList; @@ -23,4 +31,4 @@ public void addNetworkRequestContext(NetworkRequestMetricContext nodeContext) { networkRequestContexts.add(nodeContext); } -} \ No newline at end of file +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/TelemetryMetricsManager.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/TelemetryMetricsManager.java index 89180d6e8..009439551 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/TelemetryMetricsManager.java +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/TelemetryMetricsManager.java @@ -1,3 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + package org.opensearch.client.transport.client_metrics; import static org.opensearch.client.transport.client_metrics.ExecutionMetricContext.DEFAULT_EMPTY_STATUS_CODE; @@ -25,12 +33,6 @@ import java.util.function.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.opensearch.client.transport.client_metrics.ExecutionMetricContext; -import org.opensearch.client.transport.client_metrics.MeterOptions; -import org.opensearch.client.transport.client_metrics.MetricGroup; -import org.opensearch.client.transport.client_metrics.MetricTag; -import org.opensearch.client.transport.client_metrics.NetworkRequestMetricContext; -import org.opensearch.client.transport.client_metrics.RequestMetricContext; public class TelemetryMetricsManager { private static final Log logger = LogFactory.getLog(TelemetryMetricsManager.class); @@ -81,10 +83,10 @@ public static synchronized void removeRegistry(MeterRegistry registry) { * @param metricGroups {@link MetricGroup} groups to record */ public static void recordRequestMetrics( - String requestName, - MeterOptions meterOptions, - RequestMetricContext context, - Set metricGroups + String requestName, + MeterOptions meterOptions, + RequestMetricContext context, + Set metricGroups ) { if (requestName == null) { throw new IllegalArgumentException("Request name cannot be null"); @@ -105,10 +107,10 @@ public static void recordRequestMetrics( Tags payloadSizeTags = Tags.of(MetricTag.REQUEST.toString(), requestName).and(meterOptions.getCommonTags()); for (NetworkRequestMetricContext networkRequestMetricContext : context.getNetworkRequestContexts()) { recordNetworkRequestMetric( - meterOptions, - networkRequestMetricContext, - getRequestTags(requestName, networkRequestMetricContext), - excludedTags + meterOptions, + networkRequestMetricContext, + getRequestTags(requestName, networkRequestMetricContext), + excludedTags ); recordPayloadSizes(networkRequestMetricContext, payloadSizeTags); } @@ -116,9 +118,9 @@ public static void recordRequestMetrics( } public static void initializeNodeGauges( - MeterOptions meterOptions, - Supplier activeNodeUpdater, - Supplier inactiveNodeUpdater + MeterOptions meterOptions, + Supplier activeNodeUpdater, + Supplier inactiveNodeUpdater ) { if (meterOptions == null) { throw new IllegalArgumentException("Meter options cannot be null"); @@ -131,15 +133,15 @@ public static void initializeNodeGauges( } Tags tags = Tags.empty().and(meterOptions.getCommonTags()); Gauge.builder(ACTIVE_NODES.toString(), activeNodeUpdater) - .description("Number of active nodes to serve traffic") - .tags(tags) - .baseUnit("nodes") - .register(mainRegistry); + .description("Number of active nodes to serve traffic") + .tags(tags) + .baseUnit("nodes") + .register(mainRegistry); Gauge.builder(INACTIVE_NODES.toString(), inactiveNodeUpdater) - .description("Number of inactive nodes that cannot serve traffic") - .tags(tags) - .baseUnit("nodes") - .register(mainRegistry); + .description("Number of inactive nodes that cannot serve traffic") + .tags(tags) + .baseUnit("nodes") + .register(mainRegistry); } protected static void recordPayloadSizes(NetworkRequestMetricContext context, Tags tags) { @@ -154,61 +156,61 @@ protected static void recordPayloadSizes(NetworkRequestMetricContext context, Ta long responsePayloadSize = context.getResponsePayloadSize(); if (requestPayloadSize > -1) { DistributionSummary.builder(REQUEST_PAYLOAD_SIZE.toString()) - .tags(tags) - .description("Request payload size") - .baseUnit("bytes") - .register(mainRegistry) - .record(requestPayloadSize); + .tags(tags) + .description("Request payload size") + .baseUnit("bytes") + .register(mainRegistry) + .record(requestPayloadSize); } if (responsePayloadSize > -1) { DistributionSummary.builder(RESPONSE_PAYLOAD_SIZE.toString()) - .tags(tags) - .description("Response payload size") - .baseUnit("bytes") - .register(mainRegistry) - .record(responsePayloadSize); + .tags(tags) + .description("Response payload size") + .baseUnit("bytes") + .register(mainRegistry) + .record(responsePayloadSize); } } private static void recordNetworkRequestMetric( - MeterOptions meterOptions, - NetworkRequestMetricContext context, - Tags requestTags, - Set excludedTags + MeterOptions meterOptions, + NetworkRequestMetricContext context, + Tags requestTags, + Set excludedTags ) { if (context.getRequestExecutionTime() != null) { Tags networkRequestTags = excludedTags.contains(MetricTag.HOST) - ? requestTags - : requestTags.and(Tag.of(MetricTag.HOST.toString(), context.getHostName())); + ? requestTags + : requestTags.and(Tag.of(MetricTag.HOST.toString(), context.getHostName())); Timer.builder(NETWORK_REQUEST.toString()) - .description("Apache HttpClient request latency") - .publishPercentiles(meterOptions.getPercentiles()) - .tags(networkRequestTags.and(meterOptions.getCommonTags())) - .maximumExpectedValue(Duration.ofSeconds(30)) - .register(mainRegistry) - .record(context.getRequestExecutionTime()); + .description("Apache HttpClient request latency") + .publishPercentiles(meterOptions.getPercentiles()) + .tags(networkRequestTags.and(meterOptions.getCommonTags())) + .maximumExpectedValue(Duration.ofSeconds(30)) + .register(mainRegistry) + .record(context.getRequestExecutionTime()); } else if (logger.isDebugEnabled()) { logger.debug("Missing execution duration. Skipping " + NETWORK_REQUEST); } } private static void recordOverallRequestMetric( - MeterOptions meterOptions, - RequestMetricContext context, - Tags requiredRequestTags, - Set excludedTags + MeterOptions meterOptions, + RequestMetricContext context, + Tags requiredRequestTags, + Set excludedTags ) { if (context.getRequestExecutionTime() != null) { Tags requestMeterTags = excludedTags.contains(MetricTag.HOST_CONTACTED) - ? requiredRequestTags - : requiredRequestTags.and(Tag.of(MetricTag.HOST_CONTACTED.toString(), context.getContactedHosts())); + ? requiredRequestTags + : requiredRequestTags.and(Tag.of(MetricTag.HOST_CONTACTED.toString(), context.getContactedHosts())); Timer.builder(REQUEST.toString()) - .description("End-to-end request execution latency") - .publishPercentiles(meterOptions.getPercentiles()) - .tags(requestMeterTags.and(meterOptions.getCommonTags())) - .maximumExpectedValue(Duration.ofSeconds(30)) - .register(mainRegistry) - .record(context.getRequestExecutionTime()); + .description("End-to-end request execution latency") + .publishPercentiles(meterOptions.getPercentiles()) + .tags(requestMeterTags.and(meterOptions.getCommonTags())) + .maximumExpectedValue(Duration.ofSeconds(30)) + .register(mainRegistry) + .record(context.getRequestExecutionTime()); } else if (logger.isDebugEnabled()) { logger.debug("Missing execution duration. Skipping " + REQUEST); } @@ -247,4 +249,4 @@ private static String extractStatusCodeOrException(ExecutionMetricContext contex } return "UNKNOWN"; } -} \ No newline at end of file +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java index 2adfc51fe..aad8e1314 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java @@ -8,6 +8,10 @@ package org.opensearch.client.transport.httpclient5; +import static org.opensearch.client.transport.client_metrics.ExecutionMetricContext.DEFAULT_EMPTY_STATUS_CODE; +import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_REGISTRY; +import static org.opensearch.client.transport.client_metrics.MetricTag.CLIENT_ID; + import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tags; import jakarta.json.stream.JsonGenerator; @@ -111,10 +115,6 @@ import org.opensearch.client.transport.httpclient5.internal.NodeSelector; import org.opensearch.client.util.MissingRequiredPropertyException; -import static org.opensearch.client.transport.client_metrics.ExecutionMetricContext.DEFAULT_EMPTY_STATUS_CODE; -import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_REGISTRY; -import static org.opensearch.client.transport.client_metrics.MetricTag.CLIENT_ID; - /** * Apache HttpClient 5 based client transport. */ @@ -295,11 +295,11 @@ public void completed(ClassicHttpResponse httpResponse) { long endTime = System.currentTimeMillis(); try { prepareRequestMetricContext( - ((OpenSearchRequestFuture) listener).getContext(), - request, - httpResponse, - hostName, - endTime - startTime + ((OpenSearchRequestFuture) listener).getContext(), + request, + httpResponse, + hostName, + endTime - startTime ); ResponseOrResponseException responseOrResponseException = convertResponse( request, @@ -326,14 +326,14 @@ public void failed(Exception failure) { long endTime = System.currentTimeMillis(); try { ((OpenSearchRequestFuture) listener).getContext() - .addNetworkRequestContext( - new NetworkRequestMetricContext( - hostName, - failure, - DEFAULT_EMPTY_STATUS_CODE, - Duration.ofMillis(endTime - startTime) - ) - ); + .addNetworkRequestContext( + new NetworkRequestMetricContext( + hostName, + failure, + DEFAULT_EMPTY_STATUS_CODE, + Duration.ofMillis(endTime - startTime) + ) + ); onFailure(context.node); if (nodeTuple.nodes.hasNext()) { performRequestAsync(nodeTuple, options, request, warningsHandler, listener); @@ -349,14 +349,14 @@ public void failed(Exception failure) { public void cancelled() { CancellationException cancellationException = new CancellationException("request was cancelled"); ((OpenSearchRequestFuture) listener).getContext() - .addNetworkRequestContext( - new NetworkRequestMetricContext( - hostName, - cancellationException, - DEFAULT_EMPTY_STATUS_CODE, - Duration.ofMillis(System.currentTimeMillis() - startTime) - ) - ); + .addNetworkRequestContext( + new NetworkRequestMetricContext( + hostName, + cancellationException, + DEFAULT_EMPTY_STATUS_CODE, + Duration.ofMillis(System.currentTimeMillis() - startTime) + ) + ); listener.completeExceptionally(cancellationException); } } @@ -368,17 +368,17 @@ public void cancelled() { } private void prepareRequestMetricContext( - RequestMetricContext requestMetricContext, - HttpUriRequestBase request, - ClassicHttpResponse httpResponse, - String hostName, - long executionTimeMS + RequestMetricContext requestMetricContext, + HttpUriRequestBase request, + ClassicHttpResponse httpResponse, + String hostName, + long executionTimeMS ) { NetworkRequestMetricContext networkRequestMetricContext = new NetworkRequestMetricContext( - hostName, - null, - httpResponse.getCode(), - Duration.ofMillis(executionTimeMS) + hostName, + null, + httpResponse.getCode(), + Duration.ofMillis(executionTimeMS) ); // Because we compress data as it is being streamed, we can't obtain compressed size efficiently if (request.getEntity() != null && !compressionEnabled) { @@ -969,6 +969,7 @@ private Supplier getInactiveNodeGaugeUpdater() { return numInactive; }; } + private static class RequestContext { private final Node node; private final AsyncRequestProducer requestProducer; diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/OpenSearchRequestFuture.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/OpenSearchRequestFuture.java index 49b2f1ea6..8a8a6740e 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/OpenSearchRequestFuture.java +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/OpenSearchRequestFuture.java @@ -1,3 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + package org.opensearch.client.transport.httpclient5; import java.util.concurrent.CompletableFuture; @@ -9,4 +17,4 @@ public class OpenSearchRequestFuture extends CompletableFuture { public RequestMetricContext getContext() { return context; } -} \ No newline at end of file +} diff --git a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/AbstractClientMetricsIT.java b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/AbstractClientMetricsIT.java index 4243e6f9b..0dfe13483 100644 --- a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/AbstractClientMetricsIT.java +++ b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/AbstractClientMetricsIT.java @@ -57,16 +57,16 @@ public abstract class AbstractClientMetricsIT extends OpenSearchJavaClientTestCa public void testDefaultMetricGroup() throws IOException { TelemetryMetricsManager.addRegistry(getStubRegistry()); Future clusterHealthFuture = getCustomAsyncClient(getDefaultHosts(), restClientSettingsWithMetrics(Settings.EMPTY)) - .cluster() - .health(); + .cluster() + .health(); await().atMost(2, TimeUnit.SECONDS).until(clusterHealthFuture::isDone); assertEquals( - 1, - findMeter( - getStubRegistry(), - MetricName.REQUEST.toString(), - Tags.of(MetricTag.REQUEST.toString(), HealthRequest.class.getSimpleName()) - ).size() + 1, + findMeter( + getStubRegistry(), + MetricName.REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), HealthRequest.class.getSimpleName()) + ).size() ); assertEquals(1, findMeter(getStubRegistry(), MetricName.ACTIVE_NODES.toString(), Tags.empty()).size()); assertEquals(1, findMeter(getStubRegistry(), MetricName.INACTIVE_NODES.toString(), Tags.empty()).size()); @@ -79,64 +79,64 @@ public void testAllMetricGroups() throws IOException { AppData appData = new AppData(); appData.setMsg("testMetrics"); Future indexFuture = getCustomAsyncClient(getDefaultHosts(), restClientSettingsWithMetrics(ALL_METRIC_GROUP_SETTING)) - .index(b -> b.index(index).id(id).document(appData).refresh(Refresh.True)); + .index(b -> b.index(index).id(id).document(appData).refresh(Refresh.True)); await().atMost(2, TimeUnit.SECONDS).until(indexFuture::isDone); assertEquals( - 1, - findMeter( - getStubRegistry(), - MetricName.REQUEST.toString(), - Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) - ).size() + 1, + findMeter( + getStubRegistry(), + MetricName.REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ).size() ); assertEquals(1, findMeter(getStubRegistry(), MetricName.ACTIVE_NODES.toString(), Tags.empty()).size()); assertEquals(1, findMeter(getStubRegistry(), MetricName.INACTIVE_NODES.toString(), Tags.empty()).size()); assertEquals( - 1, - findMeter( - getStubRegistry(), - MetricName.NETWORK_REQUEST.toString(), - Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) - ).size() + 1, + findMeter( + getStubRegistry(), + MetricName.NETWORK_REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ).size() ); assertEquals( - 1, - findMeter( - getStubRegistry(), - MetricName.RESPONSE_PAYLOAD_SIZE.toString(), - Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) - ).size() + 1, + findMeter( + getStubRegistry(), + MetricName.RESPONSE_PAYLOAD_SIZE.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ).size() ); assertEquals( - 1, - findMeter( - getStubRegistry(), - MetricName.REQUEST_PAYLOAD_SIZE.toString(), - Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) - ).size() + 1, + findMeter( + getStubRegistry(), + MetricName.REQUEST_PAYLOAD_SIZE.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ).size() ); } public void testRequestMetricResponse() throws IOException { TelemetryMetricsManager.addRegistry(getStubRegistry()); Future clusterHealthFuture = getCustomAsyncClient( - getDefaultHosts(), - restClientSettingsWithMetrics(ALL_METRIC_GROUP_SETTING) + getDefaultHosts(), + restClientSettingsWithMetrics(ALL_METRIC_GROUP_SETTING) ).cluster().health(); await().atMost(2, TimeUnit.SECONDS).until(clusterHealthFuture::isDone); List meter = findMeter( - getStubRegistry(), - MetricName.REQUEST.toString(), - Tags.of(MetricTag.REQUEST.toString(), HealthRequest.class.getSimpleName()) + getStubRegistry(), + MetricName.REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), HealthRequest.class.getSimpleName()) ); assertEquals(1, meter.size()); Meter clusterHealthMeter = meter.get(0); meter = findMeter( - getStubRegistry(), - MetricName.NETWORK_REQUEST.toString(), - Tags.of(MetricTag.REQUEST.toString(), HealthRequest.class.getSimpleName()) + getStubRegistry(), + MetricName.NETWORK_REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), HealthRequest.class.getSimpleName()) ); assertEquals(1, meter.size()); Meter clusterHealthNetworkMeter = meter.get(0); @@ -164,9 +164,9 @@ public void testRequestMetricResponseExceptionThrown() throws IOException, Execu client.index(b -> b.index(index).id(id).document(appData).refresh(Refresh.True)).get(); try { CreateRequest duplicateCreateRequest = new CreateRequest.Builder().index(index) - .id(id) - .document(appData) - .build(); + .id(id) + .document(appData) + .build(); client.create(duplicateCreateRequest).get(); fail("Should have failed due to version conflict"); } catch (ExecutionException e) { @@ -178,17 +178,17 @@ public void testRequestMetricResponseExceptionThrown() throws IOException, Execu } // Check metrics List meter = findMeter( - getStubRegistry(), - MetricName.REQUEST.toString(), - Tags.of(MetricTag.REQUEST.toString(), CreateRequest.class.getSimpleName()) + getStubRegistry(), + MetricName.REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), CreateRequest.class.getSimpleName()) ); assertEquals(1, meter.size()); Timer createMeter = (Timer) meter.get(0); meter = findMeter( - getStubRegistry(), - MetricName.NETWORK_REQUEST.toString(), - Tags.of(MetricTag.REQUEST.toString(), CreateRequest.class.getSimpleName()) + getStubRegistry(), + MetricName.NETWORK_REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), CreateRequest.class.getSimpleName()) ); assertEquals(1, meter.size()); Timer createNetworkMeter = (Timer) meter.get(0); @@ -215,7 +215,7 @@ public void testRequestMetricResponseExceptionHandled() throws IOException, Inte appData.setMsg("testRequestMetricResponseExceptionHandled"); try { getCustomAsyncClient(getDefaultHosts(), restClientSettingsWithMetrics(ALL_METRIC_GROUP_SETTING)).index( - b -> b.index(malformedIndexName).id(id).document(appData).refresh(Refresh.True) + b -> b.index(malformedIndexName).id(id).document(appData).refresh(Refresh.True) ).get(); fail("Should have failed due to invalid index name"); } catch (ExecutionException e) { @@ -227,17 +227,17 @@ public void testRequestMetricResponseExceptionHandled() throws IOException, Inte } // Check metrics List meter = findMeter( - getStubRegistry(), - MetricName.REQUEST.toString(), - Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + getStubRegistry(), + MetricName.REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) ); assertEquals(1, meter.size()); Timer indexMeter = (Timer) meter.get(0); meter = findMeter( - getStubRegistry(), - MetricName.NETWORK_REQUEST.toString(), - Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + getStubRegistry(), + MetricName.NETWORK_REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) ); assertEquals(1, meter.size()); Timer indexNetworkMeter = (Timer) meter.get(0); @@ -265,7 +265,7 @@ public void testRequestMetricExceptionThrown() throws IOException, InterruptedEx try { getCustomAsyncClient(BAD_HOSTS, restClientSettingsWithMetrics(ALL_METRIC_GROUP_SETTING)).index( - b -> b.index(indexName).id(id).document(appData).refresh(Refresh.True) + b -> b.index(indexName).id(id).document(appData).refresh(Refresh.True) ).get(); fail("Should have failed due to bad host"); } catch (ExecutionException e) { @@ -276,17 +276,17 @@ public void testRequestMetricExceptionThrown() throws IOException, InterruptedEx } // Check metric List meter = findMeter( - getStubRegistry(), - MetricName.REQUEST.toString(), - Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + getStubRegistry(), + MetricName.REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) ); assertEquals(1, meter.size()); Timer indexMeter = (Timer) meter.get(0); meter = findMeter( - getStubRegistry(), - MetricName.NETWORK_REQUEST.toString(), - Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + getStubRegistry(), + MetricName.NETWORK_REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) ); assertEquals(2, meter.size()); @@ -310,22 +310,22 @@ public void testNodeGauges() throws InterruptedException, IOException { TelemetryMetricsManager.addRegistry(getStubRegistry()); String clientID = "badHostClient"; OpenSearchAsyncClient client = getCustomAsyncClient( - BAD_HOSTS, - restClientSettingsWithMetrics(Settings.builder().put(CUSTOM_CLIENT_ID, clientID).put(ALL_METRIC_GROUP_SETTING).build()) + BAD_HOSTS, + restClientSettingsWithMetrics(Settings.builder().put(CUSTOM_CLIENT_ID, clientID).put(ALL_METRIC_GROUP_SETTING).build()) ); // Verify active nodes List activeNodeMeter = findMeter( - getStubRegistry(), - MetricName.ACTIVE_NODES.toString(), - Tags.of(MetricTag.CLIENT_ID.toString(), clientID) + getStubRegistry(), + MetricName.ACTIVE_NODES.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID) ); assertEquals(1, activeNodeMeter.size()); assertEquals(2, ((Gauge) activeNodeMeter.get(0)).value(), 0); // Verify inactive nodes List inactiveNodeMeter = findMeter( - getStubRegistry(), - MetricName.INACTIVE_NODES.toString(), - Tags.of(MetricTag.CLIENT_ID.toString(), clientID) + getStubRegistry(), + MetricName.INACTIVE_NODES.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID) ); assertEquals(1, inactiveNodeMeter.size()); assertEquals(0, ((Gauge) inactiveNodeMeter.get(0)).value(), 0); @@ -347,17 +347,17 @@ public void testNodeGauges() throws InterruptedException, IOException { // Verify active nodes activeNodeMeter = findMeter( - getStubRegistry(), - MetricName.ACTIVE_NODES.toString(), - Tags.of(MetricTag.CLIENT_ID.toString(), clientID) + getStubRegistry(), + MetricName.ACTIVE_NODES.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID) ); assertEquals(1, activeNodeMeter.size()); assertEquals(0, ((Gauge) activeNodeMeter.get(0)).value(), 0); // Verify inactive nodes inactiveNodeMeter = findMeter( - getStubRegistry(), - MetricName.INACTIVE_NODES.toString(), - Tags.of(MetricTag.CLIENT_ID.toString(), clientID) + getStubRegistry(), + MetricName.INACTIVE_NODES.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID) ); assertEquals(1, inactiveNodeMeter.size()); assertEquals(2, ((Gauge) inactiveNodeMeter.get(0)).value(), 0); @@ -367,8 +367,8 @@ public void testPayloadSize() throws IOException { TelemetryMetricsManager.addRegistry(getStubRegistry()); String clientID = "compressingClient"; OpenSearchAsyncClient client = getCustomAsyncClient( - Stream.concat(Arrays.stream(BAD_HOSTS), Arrays.stream(getDefaultHosts())).toArray(HttpHost[]::new), - restClientSettingsWithMetrics(Settings.builder().put(CUSTOM_CLIENT_ID, clientID).put(ALL_METRIC_GROUP_SETTING).build()) + Stream.concat(Arrays.stream(BAD_HOSTS), Arrays.stream(getDefaultHosts())).toArray(HttpHost[]::new), + restClientSettingsWithMetrics(Settings.builder().put(CUSTOM_CLIENT_ID, clientID).put(ALL_METRIC_GROUP_SETTING).build()) ); String index = "index_metrics"; String id = UUID.randomUUID().toString(); @@ -379,16 +379,16 @@ public void testPayloadSize() throws IOException { Future indexFuture = client.index(b -> b.index(index).id(id).document(appData).refresh(Refresh.True)); await().atMost(Duration.ofSeconds(2)).until(indexFuture::isDone); DistributionSummary indexRequestPayload = (DistributionSummary) findMeter( - getStubRegistry(), - MetricName.REQUEST_PAYLOAD_SIZE.toString(), - Tags.of(MetricTag.CLIENT_ID.toString(), clientID, MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + getStubRegistry(), + MetricName.REQUEST_PAYLOAD_SIZE.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID, MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) ).get(0); assertTrue(500 <= indexRequestPayload.max() && indexRequestPayload.max() <= 600); assertEquals(1, indexRequestPayload.count()); DistributionSummary indexResponsePayload = (DistributionSummary) findMeter( - getStubRegistry(), - MetricName.RESPONSE_PAYLOAD_SIZE.toString(), - Tags.of(MetricTag.CLIENT_ID.toString(), clientID, MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + getStubRegistry(), + MetricName.RESPONSE_PAYLOAD_SIZE.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID, MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) ).get(0); assertEquals(1, indexResponsePayload.count()); @@ -397,16 +397,16 @@ public void testPayloadSize() throws IOException { CompletableFuture> updateFuture = client.update(b -> b.doc(appData).index(index).id(id), AppData.class); await().atMost(Duration.ofSeconds(2)).until(updateFuture::isDone); DistributionSummary updateRequestPayload = (DistributionSummary) findMeter( - getStubRegistry(), - MetricName.REQUEST_PAYLOAD_SIZE.toString(), - Tags.of(MetricTag.CLIENT_ID.toString(), clientID, MetricTag.REQUEST.toString(), UpdateRequest.class.getSimpleName()) + getStubRegistry(), + MetricName.REQUEST_PAYLOAD_SIZE.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID, MetricTag.REQUEST.toString(), UpdateRequest.class.getSimpleName()) ).get(0); assertTrue(5 * 1024 <= updateRequestPayload.max() && updateRequestPayload.max() <= (5 * 1024 + 100)); assertEquals(1, updateRequestPayload.count()); DistributionSummary updateResponsePayload = (DistributionSummary) findMeter( - getStubRegistry(), - MetricName.RESPONSE_PAYLOAD_SIZE.toString(), - Tags.of(MetricTag.CLIENT_ID.toString(), clientID, MetricTag.REQUEST.toString(), UpdateRequest.class.getSimpleName()) + getStubRegistry(), + MetricName.RESPONSE_PAYLOAD_SIZE.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID, MetricTag.REQUEST.toString(), UpdateRequest.class.getSimpleName()) ).get(0); assertTrue(updateResponsePayload.max() > 0); assertEquals(1, updateResponsePayload.count()); @@ -416,10 +416,10 @@ public void testNoRequestPayloadSizeCompressionEnabled() throws IOException { TelemetryMetricsManager.addRegistry(getStubRegistry()); String clientID = "compressingClient"; OpenSearchAsyncClient client = getCustomAsyncClient( - getDefaultHosts(), - restClientSettingsWithMetrics( - Settings.builder().put(CUSTOM_CLIENT_ID, clientID).put(ALL_METRIC_GROUP_SETTING).put(COMPRESSION_ENABLED, true).build() - ) + getDefaultHosts(), + restClientSettingsWithMetrics( + Settings.builder().put(CUSTOM_CLIENT_ID, clientID).put(ALL_METRIC_GROUP_SETTING).put(COMPRESSION_ENABLED, true).build() + ) ); String index = "index_metrics"; String id = UUID.randomUUID().toString(); @@ -428,11 +428,11 @@ public void testNoRequestPayloadSizeCompressionEnabled() throws IOException { Future indexFuture = client.index(b -> b.index(index).id(id).document(appData).refresh(Refresh.True)); await().atMost(Duration.ofSeconds(2)).until(indexFuture::isDone); assertTrue( - findMeter( - getStubRegistry(), - MetricName.REQUEST_PAYLOAD_SIZE.toString(), - Tags.of(MetricTag.CLIENT_ID.toString(), clientID, MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) - ).isEmpty() + findMeter( + getStubRegistry(), + MetricName.REQUEST_PAYLOAD_SIZE.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID, MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ).isEmpty() ); } @@ -463,4 +463,4 @@ private String generatePayload(int numBytes) { } return builder.toString(); } -} \ No newline at end of file +} diff --git a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/OpenSearchJavaClientTestCase.java b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/OpenSearchJavaClientTestCase.java index 037ae201d..a61bafc41 100644 --- a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/OpenSearchJavaClientTestCase.java +++ b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/OpenSearchJavaClientTestCase.java @@ -179,6 +179,7 @@ protected synchronized OpenSearchAsyncClient getCustomAsyncClient(HttpHost[] hos customAsyncClients.add(customAsyncClient); return customAsyncClient; } + protected String getTestRestCluster() { String cluster = System.getProperty("tests.rest.cluster"); if (cluster == null) { @@ -190,6 +191,7 @@ protected String getTestRestCluster() { public MeterRegistry getStubRegistry() { return stubRegistry; } + @After protected void wipeAllOSIndices() throws IOException { // wipe all data streams first, otherwise deleting backing indices will encounter exception @@ -241,6 +243,7 @@ private synchronized void cleanUpCustomAsyncClients() { customAsyncClients.clear(); } } + protected Version getServerVersion() throws IOException { final InfoResponse info = javaClient().info(); diff --git a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/httpclient5/ApacheHttpClient5TransportTest.java b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/httpclient5/ApacheHttpClient5TransportTest.java index a3e70dc6b..40755db01 100644 --- a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/httpclient5/ApacheHttpClient5TransportTest.java +++ b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/httpclient5/ApacheHttpClient5TransportTest.java @@ -32,24 +32,24 @@ public void testInitWithMetricsOptions() throws IOException { String clientID = "testClient"; HttpHost host = new HttpHost("localhost", 9200); MetricOptions metricOptions = MetricOptions.builder() - .setMetricsEnabled(true) - .setMeterRegistry(meterRegistry) - .setClientId(clientID) - .setPercentiles(0.90, 0.8, 0.5) - .build(); + .setMetricsEnabled(true) + .setMeterRegistry(meterRegistry) + .setClientId(clientID) + .setPercentiles(0.90, 0.8, 0.5) + .build(); ApacheHttpClient5TransportBuilder builder = ApacheHttpClient5TransportBuilder.builder(host); builder.setMetricOptions(metricOptions); try (ApacheHttpClient5Transport transport = builder.build()) { assertTrue(transport.isMetricsEnabled()); assertArrayEquals(new double[] { 0.90, 0.8, 0.5 }, transport.getMeterOptions().getPercentiles(), 0); Optional clientIDTag = transport.getMeterOptions() - .getCommonTags() - .stream() - .filter(tag -> tag.getKey().equals(MetricTag.CLIENT_ID.toString())) - .findFirst(); + .getCommonTags() + .stream() + .filter(tag -> tag.getKey().equals(MetricTag.CLIENT_ID.toString())) + .findFirst(); assertFalse(clientIDTag.isEmpty()); assertEquals(clientID, clientIDTag.get().getValue()); assertEquals(clientID, transport.getClientID()); } } -} \ No newline at end of file +} diff --git a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/httpclient5/HttpClient5TransportSupport.java b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/httpclient5/HttpClient5TransportSupport.java index bcfbe7afb..eb7e91bf8 100644 --- a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/httpclient5/HttpClient5TransportSupport.java +++ b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/httpclient5/HttpClient5TransportSupport.java @@ -118,9 +118,9 @@ public TlsDetails create(final SSLEngine sslEngine) { } if (settings.hasValue(METRICS_ENABLED) && settings.getAsBoolean(METRICS_ENABLED, false)) { MetricOptions.MetricOptionsBuilder metricOptionsBuilder = MetricOptions.builder() - .setMeterRegistry(new SimpleMeterRegistry()) - .setPercentiles(0.95) - .setMetricsEnabled(true); + .setMeterRegistry(new SimpleMeterRegistry()) + .setPercentiles(0.95) + .setMetricsEnabled(true); if (settings.hasValue(CUSTOM_CLIENT_ID)) { metricOptionsBuilder.setClientId(settings.get(CUSTOM_CLIENT_ID)); } diff --git a/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/MeterOptionsTest.java b/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/MeterOptionsTest.java index 38868fff9..8ae394279 100644 --- a/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/MeterOptionsTest.java +++ b/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/MeterOptionsTest.java @@ -19,9 +19,9 @@ public class MeterOptionsTest { @Test public void testMeterOptions() { MeterOptions meterOptions = new MeterOptions( - new double[] { 0.80, 0.85 }, - Tags.of("test1", "test2"), - EnumSet.of(MetricTag.CLIENT_ID) + new double[] { 0.80, 0.85 }, + Tags.of("test1", "test2"), + EnumSet.of(MetricTag.CLIENT_ID) ); assertArrayEquals(new double[] { 0.80, 0.85 }, meterOptions.getPercentiles(), 0); assertEquals(Tags.of("test1", "test2"), meterOptions.getCommonTags()); @@ -35,4 +35,4 @@ public void testMeterOptionsNoNull() { assertEquals(Tags.empty(), meterOptions.getCommonTags()); assertEquals(MetricConstants.DEFAULT_EXCLUDED_TAGS, meterOptions.getExcludedTagNames()); } -} \ No newline at end of file +} diff --git a/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/MetricOptionsTest.java b/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/MetricOptionsTest.java index fa6dedf7f..c404e8986 100644 --- a/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/MetricOptionsTest.java +++ b/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/MetricOptionsTest.java @@ -1,4 +1,3 @@ - /* * SPDX-License-Identifier: Apache-2.0 * @@ -30,13 +29,13 @@ public void testBuildMetricOptions() { MeterRegistry meterRegistry = new SimpleMeterRegistry(); String clientID = "testClient"; MetricOptions metricOptions = MetricOptions.builder() - .setMetricsEnabled(true) - .setMeterRegistry(meterRegistry) - .setClientId(clientID) - .setPercentiles(0.90, 0.8, 0.5) - .setExcludedTags(MetricTag.HOST_CONTACTED, MetricTag.STATUS_CODE_OR_EXCEPTION) - .setAdditionalMetricGroups(MetricGroup.NETWORK_DETAILS) - .build(); + .setMetricsEnabled(true) + .setMeterRegistry(meterRegistry) + .setClientId(clientID) + .setPercentiles(0.90, 0.8, 0.5) + .setExcludedTags(MetricTag.HOST_CONTACTED, MetricTag.STATUS_CODE_OR_EXCEPTION) + .setAdditionalMetricGroups(MetricGroup.NETWORK_DETAILS) + .build(); assertTrue(metricOptions.isMetricsEnabled()); assertEquals(meterRegistry, metricOptions.getMeterRegistry()); assertEquals(clientID, metricOptions.getClientId()); @@ -78,4 +77,4 @@ private void validateDefaults(MetricOptions metricOptions) { assertEquals(DEFAULT_EXCLUDED_TAGS, metricOptions.getExcludedTags()); assertEquals(DEFAULT_ADDITIONAL_METRIC_GROUPS, metricOptions.getMetricGroups()); } -} \ No newline at end of file +} diff --git a/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/TelemetryMetricsManagerTest.java b/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/TelemetryMetricsManagerTest.java index 5298c02a2..548257349 100644 --- a/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/TelemetryMetricsManagerTest.java +++ b/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/TelemetryMetricsManagerTest.java @@ -61,7 +61,7 @@ public void testRecordingRequestMetric() { context.setRequestExecutionTime(totalExecLatency); context.setStatusCode(200); context.addNetworkRequestContext( - new NetworkRequestMetricContext("localhost", new HttpConnectTimeoutException("error"), -1, errorLatency) + new NetworkRequestMetricContext("localhost", new HttpConnectTimeoutException("error"), -1, errorLatency) ); context.addNetworkRequestContext(new NetworkRequestMetricContext("localhost2", null, 200, totalExecLatency.minus(errorLatency))); @@ -80,9 +80,9 @@ public void testRecordingRequestMetric() { // Verify NETWORK_REQUEST meter List meters = findMeters( - stubRegistry, - MetricName.NETWORK_REQUEST.toString(), - Tags.of(MetricTag.REQUEST.toString(), requestName) + stubRegistry, + MetricName.NETWORK_REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), requestName) ); Set hosts = new HashSet<>(Arrays.asList("localhost", "localhost2")); assertEquals(2, meters.size()); @@ -101,9 +101,9 @@ public void testRecordingRequestMetric() { } else { assertEquals("200", status); assertEquals( - totalExecLatency.minus(errorLatency).toMillis(), - ((Timer) networkRequestMeter).totalTime(TimeUnit.MILLISECONDS), - 0 + totalExecLatency.minus(errorLatency).toMillis(), + ((Timer) networkRequestMeter).totalTime(TimeUnit.MILLISECONDS), + 0 ); } } @@ -118,7 +118,7 @@ public void testRecordingRequestMetricWithExceptionNoStatus() { context.setRequestExecutionTime(Duration.ofMillis(50)); context.setThrowable(new IOException("IO Errors")); context.addNetworkRequestContext( - new NetworkRequestMetricContext("localhost", new IOException("IO Errors"), -1, Duration.ofMillis(1)) + new NetworkRequestMetricContext("localhost", new IOException("IO Errors"), -1, Duration.ofMillis(1)) ); TelemetryMetricsManager.recordRequestMetrics(requestName, meterOptions, context, MetricGroup.ALL); @@ -151,7 +151,7 @@ public void testRecordingRequestMetricWithBothStatusAndException() { context.setStatusCode(409); context.setThrowable(new TransportException("Error")); context.addNetworkRequestContext( - new NetworkRequestMetricContext("localhost", new TransportException("Error"), 409, Duration.ofMillis(1)) + new NetworkRequestMetricContext("localhost", new TransportException("Error"), 409, Duration.ofMillis(1)) ); TelemetryMetricsManager.recordRequestMetrics(requestName, meterOptions, context, MetricGroup.ALL); @@ -180,9 +180,9 @@ public void testRecordingRequestMetricWithTagExclusion() { String requestName = "testRequest"; RequestMetricContext context = new RequestMetricContext(); MeterOptions meterOptions = new MeterOptions( - new double[] { 0.80, 0.85 }, - Tags.of("CommonTag", "CommonTagValue"), - EnumSet.of(MetricTag.HOST_CONTACTED, MetricTag.HOST) + new double[] { 0.80, 0.85 }, + Tags.of("CommonTag", "CommonTagValue"), + EnumSet.of(MetricTag.HOST_CONTACTED, MetricTag.HOST) ); context.setRequestExecutionTime(Duration.ofMillis(150)); context.setStatusCode(409); @@ -190,7 +190,7 @@ public void testRecordingRequestMetricWithTagExclusion() { context.addNetworkRequestContext(new NetworkRequestMetricContext("localhost", null, 409, Duration.ofMillis(1))); context.addNetworkRequestContext(new NetworkRequestMetricContext("localhost1", null, 409, Duration.ofMillis(1))); context.addNetworkRequestContext( - new NetworkRequestMetricContext("localhost2", new TransportException("Error"), 409, Duration.ofMillis(1)) + new NetworkRequestMetricContext("localhost2", new TransportException("Error"), 409, Duration.ofMillis(1)) ); TelemetryMetricsManager.recordRequestMetrics(requestName, meterOptions, context, MetricGroup.ALL); @@ -228,18 +228,18 @@ public void testMultipleNetworkContextsWithServerResponses() { MeterOptions meterOptions = new MeterOptions(new double[] { 0.80, 0.85 }, Tags.of("CommonTag", "CommonTagValue"), null); NetworkRequestMetricContext networkRequestMetricContextFail = new NetworkRequestMetricContext( - "host1", - null, - 500, - Duration.ofMillis(10) + "host1", + null, + 500, + Duration.ofMillis(10) ); networkRequestMetricContextFail.setRequestPayloadSize(500); NetworkRequestMetricContext networkRequestMetricContextSuccess = new NetworkRequestMetricContext( - "host1", - null, - 200, - Duration.ofMillis(20) + "host1", + null, + 200, + Duration.ofMillis(20) ); networkRequestMetricContextSuccess.setRequestPayloadSize(500); networkRequestMetricContextSuccess.setResponsePayloadSize(200); @@ -252,15 +252,15 @@ public void testMultipleNetworkContextsWithServerResponses() { TelemetryMetricsManager.recordRequestMetrics(requestName, meterOptions, requestMetricContext, MetricGroup.ALL); DistributionSummary requestPayloadMeter = (DistributionSummary) findMeter( - stubRegistry, - MetricName.REQUEST_PAYLOAD_SIZE.toString(), - Tags.empty() + stubRegistry, + MetricName.REQUEST_PAYLOAD_SIZE.toString(), + Tags.empty() ).get(); assertEquals(2, requestPayloadMeter.count()); DistributionSummary responsePayloadMeter = (DistributionSummary) findMeter( - stubRegistry, - MetricName.RESPONSE_PAYLOAD_SIZE.toString(), - Tags.empty() + stubRegistry, + MetricName.RESPONSE_PAYLOAD_SIZE.toString(), + Tags.empty() ).get(); assertEquals(1, responsePayloadMeter.count()); } @@ -285,4 +285,4 @@ private List findMeters(MeterRegistry registry, String meterName, Tags ta private Optional findMeter(MeterRegistry registry, String meterName, Tags tags) { return findMeters(registry, meterName, tags).stream().findFirst(); } -} \ No newline at end of file +} From 7f8aa0e22af733f069bd1238f23c7c6e434d0cb1 Mon Sep 17 00:00:00 2001 From: psingh3 Date: Tue, 20 May 2025 12:00:00 +0530 Subject: [PATCH 05/11] fixing test cases Signed-off-by: psingh3 --- .../transport/httpclient5/ApacheHttpClient5Transport.java | 2 +- .../client/opensearch/integTest/AbstractClientMetricsIT.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java index aad8e1314..77b734623 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java @@ -263,7 +263,7 @@ public String getClientID() { return clientID; } - protected MeterOptions getMeterOptions() { + public MeterOptions getMeterOptions() { return meterOptions; } diff --git a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/AbstractClientMetricsIT.java b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/AbstractClientMetricsIT.java index 0dfe13483..37b7362ca 100644 --- a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/AbstractClientMetricsIT.java +++ b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/AbstractClientMetricsIT.java @@ -30,7 +30,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hc.client5.http.HttpHostConnectException; -import org.apache.http.HttpHost; +import org.apache.hc.core5.http.HttpHost; import org.opensearch.client.opensearch.OpenSearchAsyncClient; import org.opensearch.client.opensearch._types.OpenSearchException; import org.opensearch.client.opensearch._types.Refresh; From dc72cb56ac64b06a0f3524c64c7e35da75afc648 Mon Sep 17 00:00:00 2001 From: psingh3 Date: Tue, 20 May 2025 14:27:02 +0530 Subject: [PATCH 06/11] upgraded micrometer dependency to 1.13.13 Signed-off-by: psingh3 --- java-client/build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java-client/build.gradle.kts b/java-client/build.gradle.kts index adc4a67f4..e5625fd3f 100644 --- a/java-client/build.gradle.kts +++ b/java-client/build.gradle.kts @@ -248,7 +248,7 @@ dependencies { } // Micrometer - implementation("io.micrometer:micrometer-core:1.12.3") + implementation("io.micrometer:micrometer-core:1.13.13") // Awaitility testImplementation("org.awaitility:awaitility:4.2.0") From e4608dfd10c52139076256e8d8125743d7f5e585 Mon Sep 17 00:00:00 2001 From: psingh3 Date: Tue, 27 May 2025 13:33:07 +0530 Subject: [PATCH 07/11] updated CHANGELOG.md Signed-off-by: psingh3 --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 92d4aaf33..56954ba12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ### Added - Metrics support includes micrometer integration and Prometheus support with a custom client-side metrics. ### Dependencies -- Added micrometer dependency `io.micrometer:micrometer-core` version 1.12.3 +- Added micrometer dependency `io.micrometer:micrometer-core` version 1.13.13 ### Changed From bc4a27353b0d1339ea6a040379f028f6dd2356c9 Mon Sep 17 00:00:00 2001 From: psingh3 Date: Tue, 3 Jun 2025 14:56:58 +0530 Subject: [PATCH 08/11] Added a metrics.md file to explain the metrics and their uses. Signed-off-by: psingh3 --- guides/metrics.md | 75 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 guides/metrics.md diff --git a/guides/metrics.md b/guides/metrics.md new file mode 100644 index 000000000..246ba0204 --- /dev/null +++ b/guides/metrics.md @@ -0,0 +1,75 @@ +- [SDK Metrics](#SDK-Metrics) + - [How to enable metrics](#get-client) + - [Metrics Collected](#metrics-collected) + +# SDK Metrics + +We've integrated a robust metrics solution into the OpenSearch Java client to provide comprehensive insights into its API usage and performance. This includes the collection of vital operational metrics, such as overall throughput, request latency, and error rate. Furthermore, we're capturing more granular details like request and response payload sizes, distinct success and failure rates for operations, and real-time OpenSearch node status to provide a holistic view of client behavior and cluster health. + +## How to enable metrics + +We should create a MetricOptions instance and set it in the ApacheHttpClient5TransportBuilder when creating the client. Take a look at below code snippet for an example of how to create a client with metrics enabled: + +```java +public class CreateClient { + public static OpenSearchClient createClientWithMetrics() throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { + var env = System.getenv(); + var https = Boolean.parseBoolean(env.getOrDefault("HTTPS", "true")); + var hostname = env.getOrDefault("HOST", "localhost"); + var port = Integer.parseInt(env.getOrDefault("PORT", "9200")); + var user = env.getOrDefault("USERNAME", "admin"); + var pass = env.getOrDefault("PASSWORD", "admin"); + var metricEnabled = true; + double PERCENTILE_99 = 0.99; + double PERCENTILE_95 = 0.95; + var meterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); + + final var hosts = new HttpHost[]{new HttpHost(https ? "https" : "http", hostname, port)}; + + final var sslContext = SSLContextBuilder.create().loadTrustMaterial(null, (chains, authType) -> true).build(); + + MetricOptions metricOptions = MetricOptions.builder() + .setMetricsEnabled(metricEnabled) // required to turn metrics on/off + .setMeterRegistry(meterRegistry) + .setPercentiles(PERCENTILE_99, PERCENTILE_95) + .setAdditionalMetricGroups(MetricGroup.NETWORK_DETAILS) + .setExcludedTags(MetricTag.HOST_CONTACTED) + .build(); + + final var transport = ApacheHttpClient5TransportBuilder.builder(hosts) + .setMapper(new JacksonJsonpMapper()) + .setMetricOptions(metricOptions) + .setHttpClientConfigCallback(httpClientBuilder -> { + final var credentialsProvider = new BasicCredentialsProvider(); + for (final var host : hosts) { + credentialsProvider.setCredentials(new AuthScope(host), new UsernamePasswordCredentials(user, pass.toCharArray())); + } + +// Disable SSL/TLS verification as our local testing clusters use self-signed certificates + final var tlsStrategy = ClientTlsStrategyBuilder.create() + .setSslContext(sslContext) + .setHostnameVerifier(NoopHostnameVerifier.INSTANCE) + .build(); + + final var connectionManager = PoolingAsyncClientConnectionManagerBuilder.create().setTlsStrategy(tlsStrategy).build(); + + return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider).setConnectionManager(connectionManager); + }) + .build(); + return new OpenSearchClient(transport); + } +} +``` + +## Metrics Collected + +The OpenSearch Java client collects a variety of metrics to provide insights into its operations. Below is a summary of the key metrics collected, along with their descriptions and important dimensions, such as status code and request type, that can be used for filtering and analysis. + +| Metric | Description | Important Dimensions(tags) | +|---------------------------------------|-----------------------------------------------------------------|--------------------------------| +| os.client.request.seconds | End-to-end request execution latency | StatusCodeOrException, Request | +| os.client.request.seconds.count | request throughput and error rate based on status tags | StatusCodeOrException, Request | +| os.client.request.payload.size.bytes | Request payload size in bytes | Request | +| os.client.response.payload.size.bytes | Response payload size in bytes | Request | +| os.client.active.nodes | Number of OpenSearch active nodes from a client's perspective | | +| os.client.inactive.nodes | Number of OpenSearch inactive nodes from a client's perspective | | \ No newline at end of file From c9db41244ce73b6b7cf68cc1a533c330718bbb65 Mon Sep 17 00:00:00 2001 From: psingh3 Date: Tue, 3 Jun 2025 14:59:28 +0530 Subject: [PATCH 09/11] updated the user guide Signed-off-by: psingh3 --- USER_GUIDE.md | 1 + 1 file changed, 1 insertion(+) diff --git a/USER_GUIDE.md b/USER_GUIDE.md index dfe1399bf..11dceb247 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -200,6 +200,7 @@ You can find a working sample of the above code in [IndexingBasics.java](./sampl - [Search](./guides/search.md) - [Generic Client](./guides/generic.md) - [Json](./guides/json.md) +- [Metrics](./guides/metrics.md) ## Plugins From 79fbdfe92dd8c95a263ecb4ccaac4f6360b00316 Mon Sep 17 00:00:00 2001 From: psingh3 Date: Tue, 3 Jun 2025 15:03:32 +0530 Subject: [PATCH 10/11] updated changelog Signed-off-by: psingh3 --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fcfecfe95..c0b54faee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ## [Unreleased 3.x] ### Added -- Metrics support includes micrometer integration and Prometheus support with a custom client-side metrics. +- Metrics support includes micrometer integration and Prometheus support with a custom client-side metrics. [Metrics](./guides/metrics.md) ### Dependencies - Bump `org.apache.httpcomponents.client5:httpclient5` from 5.4.4 to 5.5 ([#1578](https://github.com/opensearch-project/opensearch-java/pull/1578)) - Added micrometer dependency `io.micrometer:micrometer-core` version 1.13.13 From 7bda4120cbdaa9caae1c940a72f2deae38e965fc Mon Sep 17 00:00:00 2001 From: psingh3 Date: Wed, 4 Jun 2025 09:44:22 +0530 Subject: [PATCH 11/11] fixing review comments Signed-off-by: psingh3 --- CHANGELOG.md | 2 +- guides/metrics.md | 24 +++++++++++++----------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5384ad016..900ad36d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ## [Unreleased 3.x] ### Added -- Metrics support includes micrometer integration and Prometheus support with a custom client-side metrics. [Metrics](./guides/metrics.md) +- Metrics support includes Micrometer integration for custom client-side metrics. [Metrics](./guides/metrics.md) ### Dependencies - Bump `org.apache.httpcomponents.client5:httpclient5` from 5.4.4 to 5.5 ([#1578](https://github.com/opensearch-project/opensearch-java/pull/1578)) - Bump `org.junit:junit-bom` from 5.12.2 to 5.13.0 ([#1587](https://github.com/opensearch-project/opensearch-java/pull/1587)) diff --git a/guides/metrics.md b/guides/metrics.md index 246ba0204..c2a92b695 100644 --- a/guides/metrics.md +++ b/guides/metrics.md @@ -6,9 +6,11 @@ We've integrated a robust metrics solution into the OpenSearch Java client to provide comprehensive insights into its API usage and performance. This includes the collection of vital operational metrics, such as overall throughput, request latency, and error rate. Furthermore, we're capturing more granular details like request and response payload sizes, distinct success and failure rates for operations, and real-time OpenSearch node status to provide a holistic view of client behavior and cluster health. +The implementation utilizes [Micrometer](https://mvnrepository.com/artifact/io.micrometer/micrometer-core), which supports integrations with various popular monitoring systems ([see their overview](https://docs.micrometer.io/micrometer/reference/implementations.html)). This enables seamless collection and export of metrics to tools like Prometheus and Grafana, allowing real-time monitoring of the OpenSearch client's performance and health. + ## How to enable metrics -We should create a MetricOptions instance and set it in the ApacheHttpClient5TransportBuilder when creating the client. Take a look at below code snippet for an example of how to create a client with metrics enabled: +We should create a `MetricOptions` instance and set it in the `ApacheHttpClient5TransportBuilder` when creating the client. Take a look at below code snippet for an example of how to create a client with metrics enabled: ```java public class CreateClient { @@ -19,10 +21,10 @@ public class CreateClient { var port = Integer.parseInt(env.getOrDefault("PORT", "9200")); var user = env.getOrDefault("USERNAME", "admin"); var pass = env.getOrDefault("PASSWORD", "admin"); - var metricEnabled = true; + var metricsEnabled = true; double PERCENTILE_99 = 0.99; double PERCENTILE_95 = 0.95; - var meterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); + var meterRegistry = new SimpleMeterRegistry(); final var hosts = new HttpHost[]{new HttpHost(https ? "https" : "http", hostname, port)}; @@ -65,11 +67,11 @@ public class CreateClient { The OpenSearch Java client collects a variety of metrics to provide insights into its operations. Below is a summary of the key metrics collected, along with their descriptions and important dimensions, such as status code and request type, that can be used for filtering and analysis. -| Metric | Description | Important Dimensions(tags) | -|---------------------------------------|-----------------------------------------------------------------|--------------------------------| -| os.client.request.seconds | End-to-end request execution latency | StatusCodeOrException, Request | -| os.client.request.seconds.count | request throughput and error rate based on status tags | StatusCodeOrException, Request | -| os.client.request.payload.size.bytes | Request payload size in bytes | Request | -| os.client.response.payload.size.bytes | Response payload size in bytes | Request | -| os.client.active.nodes | Number of OpenSearch active nodes from a client's perspective | | -| os.client.inactive.nodes | Number of OpenSearch inactive nodes from a client's perspective | | \ No newline at end of file +| Metric | Description | Important Dimensions(tags) | +|-----------------------------------------|-----------------------------------------------------------------|------------------------------------| +| `os.client.request.seconds` | End-to-end request execution latency | `StatusCodeOrException`, `Request` | +| `os.client.request.seconds.count` | request throughput and error rate based on status tags | `StatusCodeOrException`, `Request` | +| `os.client.request.payload.size.bytes` | Request payload size in bytes | `Request` | +| `os.client.response.payload.size.bytes` | Response payload size in bytes | `Request` | +| `os.client.active.nodes` | Number of OpenSearch active nodes from a client's perspective | | +| `os.client.inactive.nodes` | Number of OpenSearch inactive nodes from a client's perspective | | \ No newline at end of file