From ac188bffc13c85a796be90ab07ac17a40f961092 Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Mon, 8 Jul 2024 18:33:50 -0700 Subject: [PATCH 01/15] [bug fix] fix incorrect coordinator node search resource usages Signed-off-by: Chenyang Ji --- .../core/listener/QueryInsightsListener.java | 205 ++++++++++++++++++ .../cluster/service/ClusterService.java | 20 ++ .../main/java/org/opensearch/node/Node.java | 1 + .../tasks/TaskResourceTrackingService.java | 3 + 4 files changed, 229 insertions(+) create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java new file mode 100644 index 0000000000000..63ed7a51da896 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -0,0 +1,205 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.listener; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.search.SearchPhaseContext; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchRequestContext; +import org.opensearch.action.search.SearchRequestOperationsListener; +import org.opensearch.action.search.SearchTask; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.rules.model.Attribute; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.tasks.Task; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNEnabledSetting; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNSizeSetting; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNWindowSizeSetting; + +/** + * The listener for query insights services. + * It forwards query-related data to the appropriate query insights stores, + * either for each request or for each phase. + * + * @opensearch.internal + */ +public final class QueryInsightsListener extends SearchRequestOperationsListener { + private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false")); + + private static final Logger log = LogManager.getLogger(QueryInsightsListener.class); + + private final QueryInsightsService queryInsightsService; + private final ClusterService clusterService; + + /** + * Constructor for QueryInsightsListener + * + * @param clusterService The Node's cluster service. + * @param queryInsightsService The topQueriesByLatencyService associated with this listener + */ + @Inject + public QueryInsightsListener(final ClusterService clusterService, final QueryInsightsService queryInsightsService) { + this.clusterService = clusterService; + this.queryInsightsService = queryInsightsService; + // Setting endpoints set up for top n queries, including enabling top n queries, window size and top n size + // Expected metricTypes are Latency, CPU and Memory. + for (MetricType type : MetricType.allMetricTypes()) { + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(getTopNEnabledSetting(type), v -> this.setEnableTopQueries(type, v)); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + getTopNSizeSetting(type), + v -> this.queryInsightsService.setTopNSize(type, v), + v -> this.queryInsightsService.validateTopNSize(type, v) + ); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + getTopNWindowSizeSetting(type), + v -> this.queryInsightsService.setWindowSize(type, v), + v -> this.queryInsightsService.validateWindowSize(type, v) + ); + + this.setEnableTopQueries(type, clusterService.getClusterSettings().get(getTopNEnabledSetting(type))); + this.queryInsightsService.validateTopNSize(type, clusterService.getClusterSettings().get(getTopNSizeSetting(type))); + this.queryInsightsService.setTopNSize(type, clusterService.getClusterSettings().get(getTopNSizeSetting(type))); + this.queryInsightsService.validateWindowSize(type, clusterService.getClusterSettings().get(getTopNWindowSizeSetting(type))); + this.queryInsightsService.setWindowSize(type, clusterService.getClusterSettings().get(getTopNWindowSizeSetting(type))); + } + } + + /** + * Enable or disable top queries insights collection for {@link MetricType} + * This function will enable or disable the corresponding listeners + * and query insights services. + * + * @param metricType {@link MetricType} + * @param enabled boolean + */ + public void setEnableTopQueries(final MetricType metricType, final boolean enabled) { + boolean isAllMetricsDisabled = !queryInsightsService.isEnabled(); + this.queryInsightsService.enableCollection(metricType, enabled); + if (!enabled) { + // disable QueryInsightsListener only if all metrics collections are disabled now. + if (!queryInsightsService.isEnabled()) { + super.setEnabled(false); + this.queryInsightsService.stop(); + } + } else { + super.setEnabled(true); + // restart QueryInsightsListener only if none of metrics collections is enabled before. + if (isAllMetricsDisabled) { + this.queryInsightsService.stop(); + this.queryInsightsService.start(); + } + } + + } + + @Override + public boolean isEnabled() { + return super.isEnabled(); + } + + @Override + public void onPhaseStart(SearchPhaseContext context) {} + + @Override + public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + + @Override + public void onPhaseFailure(SearchPhaseContext context, Throwable cause) {} + + @Override + public void onRequestStart(SearchRequestContext searchRequestContext) {} + + @Override + public void onRequestEnd(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) { + constructSearchQueryRecord(context, searchRequestContext); + } + + @Override + public void onRequestFailure(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) { + constructSearchQueryRecord(context, searchRequestContext); + } + + private void constructSearchQueryRecord(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) { + SearchTask searchTask = context.getTask(); + List tasksResourceUsages = searchRequestContext.getPhaseResourceUsage(); + if (clusterService.getTaskResourceTrackingService() != null) { + clusterService.getTaskResourceTrackingService().refreshResourceStats(searchTask); + } + tasksResourceUsages.add( + new TaskResourceInfo( + searchTask.getAction(), + searchTask.getId(), + searchTask.getParentTaskId().getId(), + clusterService.localNode().getId(), + searchTask.getTotalResourceStats() + ) + ); + + final SearchRequest request = context.getRequest(); + try { + Map measurements = new HashMap<>(); + if (queryInsightsService.isCollectionEnabled(MetricType.LATENCY)) { + measurements.put( + MetricType.LATENCY, + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos()) + ); + } + if (queryInsightsService.isCollectionEnabled(MetricType.CPU)) { + measurements.put( + MetricType.CPU, + tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getCpuTimeInNanos()).mapToLong(Long::longValue).sum() + ); + } + if (queryInsightsService.isCollectionEnabled(MetricType.MEMORY)) { + measurements.put( + MetricType.MEMORY, + tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getMemoryInBytes()).mapToLong(Long::longValue).sum() + ); + } + Map attributes = new HashMap<>(); + attributes.put(Attribute.SEARCH_TYPE, request.searchType().toString().toLowerCase(Locale.ROOT)); + attributes.put(Attribute.SOURCE, request.source().toString(FORMAT_PARAMS)); + attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards()); + attributes.put(Attribute.INDICES, request.indices()); + attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap()); + attributes.put(Attribute.TASK_RESOURCE_USAGES, tasksResourceUsages); + + Map labels = new HashMap<>(); + // Retrieve user provided label if exists + String userProvidedLabel = context.getTask().getHeader(Task.X_OPAQUE_ID); + if (userProvidedLabel != null) { + labels.put(Task.X_OPAQUE_ID, userProvidedLabel); + } + attributes.put(Attribute.LABELS, labels); + // construct SearchQueryRecord from attributes and measurements + SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes); + queryInsightsService.addRecord(record); + } catch (Exception e) { + log.error(String.format(Locale.ROOT, "fail to ingest query insight data, error: %s", e)); + } + } + +} diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java index c3c48dd8b87ef..4ece885a55b70 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java @@ -54,6 +54,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexingPressureService; import org.opensearch.node.Node; +import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.threadpool.ThreadPool; @@ -92,6 +93,7 @@ public class ClusterService extends AbstractLifecycleComponent { private RerouteService rerouteService; private IndexingPressureService indexingPressureService; + private TaskResourceTrackingService taskResourceTrackingService; public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { this(settings, clusterSettings, threadPool, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)); @@ -265,6 +267,24 @@ public IndexingPressureService getIndexingPressureService() { return indexingPressureService; } + /** + * Getter for {@link TaskResourceTrackingService}, This method exposes task level resource usage for other components to use. + * + * @return TaskResourceTrackingService + */ + public TaskResourceTrackingService getTaskResourceTrackingService() { + return taskResourceTrackingService; + } + + /** + * Setter for {@link TaskResourceTrackingService} + * + * @param taskResourceTrackingService taskResourceTrackingService + */ + public void setTaskResourceTrackingService(TaskResourceTrackingService taskResourceTrackingService) { + this.taskResourceTrackingService = taskResourceTrackingService; + } + public ClusterApplierService getClusterApplierService() { return clusterApplierService; } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index d91b2a45a48c6..798c95ed73dea 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1122,6 +1122,7 @@ protected Node( clusterService.getClusterSettings(), threadPool ); + clusterService.setTaskResourceTrackingService(taskResourceTrackingService); final SearchBackpressureSettings searchBackpressureSettings = new SearchBackpressureSettings( settings, diff --git a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java b/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java index ca1957cdb1633..ea62093d0c893 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java +++ b/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java @@ -16,6 +16,7 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.action.search.SearchShardTask; import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; @@ -51,6 +52,7 @@ /** * Service that helps track resource usage of tasks running on a node. */ +@PublicApi(since = "2.15.0") @SuppressForbidden(reason = "ThreadMXBean#getThreadAllocatedBytes") public class TaskResourceTrackingService implements RunnableTaskExecutionListener { @@ -357,6 +359,7 @@ public TaskResourceInfo getTaskResourceUsageFromThreadContext() { /** * Listener that gets invoked when a task execution completes. */ + @PublicApi(since = "2.15.0") public interface TaskCompletionListener { void onTaskCompleted(Task task); } From e5081ff6ac72609eb8e08dd35206374874f5034e Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Tue, 9 Jul 2024 00:22:51 -0700 Subject: [PATCH 02/15] fix bug on serialization when passing task resource usage to coordinator Signed-off-by: Chenyang Ji --- .../insights/rules/model/Attribute.java | 152 ++++++++++++++ .../rules/model/SearchQueryRecord.java | 187 ++++++++++++++++++ .../tasks/TaskResourceTrackingService.java | 4 +- 3 files changed, 341 insertions(+), 2 deletions(-) create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java new file mode 100644 index 0000000000000..80c80fb6b6937 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java @@ -0,0 +1,152 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model; + +import org.apache.lucene.util.ArrayUtil; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +/** + * Valid attributes for a search query record + * + * @opensearch.internal + */ +public enum Attribute { + /** + * The search query type + */ + SEARCH_TYPE, + /** + * The search query source + */ + SOURCE, + /** + * Total shards queried + */ + TOTAL_SHARDS, + /** + * The indices involved + */ + INDICES, + /** + * The per phase level latency map for a search query + */ + PHASE_LATENCY_MAP, + /** + * The node id for this request + */ + NODE_ID, + /** + * Tasks level resource usages in this request + */ + TASK_RESOURCE_USAGES, + /** + * Custom search request labels + */ + LABELS; + + /** + * Read an Attribute from a StreamInput + * + * @param in the StreamInput to read from + * @return Attribute + * @throws IOException IOException + */ + static Attribute readFromStream(final StreamInput in) throws IOException { + return Attribute.valueOf(in.readString().toUpperCase(Locale.ROOT)); + } + + /** + * Write Attribute to a StreamOutput + * + * @param out the StreamOutput to write + * @param attribute the Attribute to write + * @throws IOException IOException + */ + static void writeTo(final StreamOutput out, final Attribute attribute) throws IOException { + out.writeString(attribute.toString()); + } + + /** + * Write Attribute value to a StreamOutput + * @param out the StreamOutput to write + * @param attributeValue the Attribute value to write + */ + @SuppressWarnings("unchecked") + public static void writeValueTo(StreamOutput out, Object attributeValue) throws IOException { + if (attributeValue instanceof List) { + out.writeList((List) attributeValue); + } else { + out.writeGenericValue(attributeValue); + } + } + + /** + * Read attribute value from the input stream given the Attribute type + * + * @param in the {@link StreamInput} input to read + * @param attribute attribute type to differentiate between Source and others + * @return parse value + * @throws IOException IOException + */ + public static Object readAttributeValue(StreamInput in, Attribute attribute) throws IOException { + if (attribute == Attribute.TASK_RESOURCE_USAGES) { + return in.readList(TaskResourceInfo::readFromStream); + } else { + return in.readGenericValue(); + } + } + + /** + * Read attribute map from the input stream + * + * @param in the {@link StreamInput} to read + * @return parsed attribute map + * @throws IOException IOException + */ + public static Map readAttributeMap(StreamInput in) throws IOException { + int size = readArraySize(in); + if (size == 0) { + return Collections.emptyMap(); + } + Map map = new HashMap<>(size); + + for (int i = 0; i < size; i++) { + Attribute key = readFromStream(in); + Object value = readAttributeValue(in, key); + map.put(key, value); + } + return map; + } + + private static int readArraySize(StreamInput in) throws IOException { + final int arraySize = in.readVInt(); + if (arraySize > ArrayUtil.MAX_ARRAY_LENGTH) { + throw new IllegalStateException("array length must be <= to " + ArrayUtil.MAX_ARRAY_LENGTH + " but was: " + arraySize); + } + if (arraySize < 0) { + throw new NegativeArraySizeException("array size must be positive but was: " + arraySize); + } + return arraySize; + } + + @Override + public String toString() { + return this.name().toLowerCase(Locale.ROOT); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java new file mode 100644 index 0000000000000..a6e6b4a9051f0 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java @@ -0,0 +1,187 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model; + +import org.opensearch.core.common.Strings; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * SearchQueryRecord represents a minimal atomic record stored in the Query Insight Framework, + * which contains extensive information related to a search query. + * + * @opensearch.internal + */ +public class SearchQueryRecord implements ToXContentObject, Writeable { + private final long timestamp; + private final Map measurements; + private final Map attributes; + + /** + * Constructor of SearchQueryRecord + * + * @param in the StreamInput to read the SearchQueryRecord from + * @throws IOException IOException + * @throws ClassCastException ClassCastException + */ + public SearchQueryRecord(final StreamInput in) throws IOException, ClassCastException { + this.timestamp = in.readLong(); + measurements = new HashMap<>(); + in.readMap(MetricType::readFromStream, StreamInput::readGenericValue) + .forEach(((metricType, o) -> measurements.put(metricType, metricType.parseValue(o)))); + this.attributes = Attribute.readAttributeMap(in); + } + + /** + * Constructor of SearchQueryRecord + * + * @param timestamp The timestamp of the query. + * @param measurements A list of Measurement associated with this query + * @param attributes A list of Attributes associated with this query + */ + public SearchQueryRecord(final long timestamp, Map measurements, final Map attributes) { + if (measurements == null) { + throw new IllegalArgumentException("Measurements cannot be null"); + } + this.measurements = measurements; + this.attributes = attributes; + this.timestamp = timestamp; + } + + /** + * Returns the observation time of the metric. + * + * @return the observation time in milliseconds + */ + public long getTimestamp() { + return timestamp; + } + + /** + * Returns the measurement associated with the specified name. + * + * @param name the name of the measurement + * @return the measurement object, or null if not found + */ + public Number getMeasurement(final MetricType name) { + return measurements.get(name); + } + + /** + * Returns a map of all the measurements associated with the metric. + * + * @return a map of measurement names to measurement objects + */ + public Map getMeasurements() { + return measurements; + } + + /** + * Returns a map of the attributes associated with the metric. + * + * @return a map of attribute keys to attribute values + */ + public Map getAttributes() { + return attributes; + } + + /** + * Add an attribute to this record + * + * @param attribute attribute to add + * @param value the value associated with the attribute + */ + public void addAttribute(final Attribute attribute, final Object value) { + attributes.put(attribute, value); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final ToXContent.Params params) throws IOException { + builder.startObject(); + builder.field("timestamp", timestamp); + for (Map.Entry entry : attributes.entrySet()) { + builder.field(entry.getKey().toString(), entry.getValue()); + } + for (Map.Entry entry : measurements.entrySet()) { + builder.field(entry.getKey().toString(), entry.getValue()); + } + return builder.endObject(); + } + + /** + * Write a SearchQueryRecord to a StreamOutput + * + * @param out the StreamOutput to write + * @throws IOException IOException + */ + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeLong(timestamp); + out.writeMap(measurements, (stream, metricType) -> MetricType.writeTo(out, metricType), StreamOutput::writeGenericValue); + out.writeMap( + attributes, + (stream, attribute) -> Attribute.writeTo(out, attribute), + (stream, attributeValue) -> Attribute.writeValueTo(out, attributeValue) + ); + } + + /** + * Compare two SearchQueryRecord, based on the given MetricType + * + * @param a the first SearchQueryRecord to compare + * @param b the second SearchQueryRecord to compare + * @param metricType the MetricType to compare on + * @return 0 if the first SearchQueryRecord is numerically equal to the second SearchQueryRecord; + * -1 if the first SearchQueryRecord is numerically less than the second SearchQueryRecord; + * 1 if the first SearchQueryRecord is numerically greater than the second SearchQueryRecord. + */ + public static int compare(final SearchQueryRecord a, final SearchQueryRecord b, final MetricType metricType) { + return metricType.compare(a.getMeasurement(metricType), b.getMeasurement(metricType)); + } + + /** + * Check if a SearchQueryRecord is deep equal to another record + * + * @param o the other SearchQueryRecord record + * @return true if two records are deep equal, false otherwise. + */ + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof SearchQueryRecord)) { + return false; + } + final SearchQueryRecord other = (SearchQueryRecord) o; + return timestamp == other.getTimestamp() + && measurements.equals(other.getMeasurements()) + && attributes.size() == other.getAttributes().size(); + } + + @Override + public int hashCode() { + return Objects.hash(timestamp, measurements, attributes); + } + + @Override + public String toString() { + return Strings.toString(MediaTypeRegistry.JSON, this); + } +} diff --git a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java b/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java index ea62093d0c893..80c9e2227e9fe 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java +++ b/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java @@ -52,7 +52,7 @@ /** * Service that helps track resource usage of tasks running on a node. */ -@PublicApi(since = "2.15.0") +@PublicApi(since = "2.16.0") @SuppressForbidden(reason = "ThreadMXBean#getThreadAllocatedBytes") public class TaskResourceTrackingService implements RunnableTaskExecutionListener { @@ -359,7 +359,7 @@ public TaskResourceInfo getTaskResourceUsageFromThreadContext() { /** * Listener that gets invoked when a task execution completes. */ - @PublicApi(since = "2.15.0") + @PublicApi(since = "2.16.0") public interface TaskCompletionListener { void onTaskCompleted(Task task); } From cc335cc8a2a15778a2291b30d1aa84f7317a6592 Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Tue, 9 Jul 2024 12:35:39 -0700 Subject: [PATCH 03/15] add more unit tests Signed-off-by: Chenyang Ji --- .../insights/QueryInsightsTestUtils.java | 226 ++++++++++++++++++ .../listener/QueryInsightsListenerTests.java | 222 +++++++++++++++++ 2 files changed, 448 insertions(+) create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java new file mode 100644 index 0000000000000..54cafdc97ea74 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java @@ -0,0 +1,226 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights; + +import org.opensearch.action.search.SearchType; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.util.Maps; +import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; +import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries; +import org.opensearch.plugin.insights.rules.model.Attribute; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.test.VersionUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.opensearch.test.OpenSearchTestCase.buildNewFakeTransportAddress; +import static org.opensearch.test.OpenSearchTestCase.random; +import static org.opensearch.test.OpenSearchTestCase.randomAlphaOfLengthBetween; +import static org.opensearch.test.OpenSearchTestCase.randomArray; +import static org.opensearch.test.OpenSearchTestCase.randomIntBetween; +import static org.opensearch.test.OpenSearchTestCase.randomLong; +import static org.opensearch.test.OpenSearchTestCase.randomLongBetween; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +final public class QueryInsightsTestUtils { + + public QueryInsightsTestUtils() {} + + public static List generateQueryInsightRecords(int count) { + return generateQueryInsightRecords(count, count, System.currentTimeMillis(), 0); + } + + /** + * Creates a List of random Query Insight Records for testing purpose + */ + public static List generateQueryInsightRecords(int lower, int upper, long startTimeStamp, long interval) { + List records = new ArrayList<>(); + int countOfRecords = randomIntBetween(lower, upper); + long timestamp = startTimeStamp; + for (int i = 0; i < countOfRecords; ++i) { + Map measurements = Map.of( + MetricType.LATENCY, + randomLongBetween(1000, 10000), + MetricType.CPU, + randomLongBetween(1000, 10000), + MetricType.MEMORY, + randomLongBetween(1000, 10000) + ); + + Map phaseLatencyMap = new HashMap<>(); + int countOfPhases = randomIntBetween(2, 5); + for (int j = 0; j < countOfPhases; ++j) { + phaseLatencyMap.put(randomAlphaOfLengthBetween(5, 10), randomLong()); + } + Map attributes = new HashMap<>(); + attributes.put(Attribute.SEARCH_TYPE, SearchType.QUERY_THEN_FETCH.toString().toLowerCase(Locale.ROOT)); + attributes.put(Attribute.SOURCE, "{\"size\":20}"); + attributes.put(Attribute.TOTAL_SHARDS, randomIntBetween(1, 100)); + attributes.put(Attribute.INDICES, randomArray(1, 3, Object[]::new, () -> randomAlphaOfLengthBetween(5, 10))); + attributes.put(Attribute.PHASE_LATENCY_MAP, phaseLatencyMap); + attributes.put( + Attribute.TASK_RESOURCE_USAGES, + List.of( + new TaskResourceInfo( + randomAlphaOfLengthBetween(5, 10), + randomLongBetween(1, 1000), + randomLongBetween(1, 1000), + randomAlphaOfLengthBetween(5, 10), + new TaskResourceUsage(randomLongBetween(1, 1000), randomLongBetween(1, 1000)) + ), + new TaskResourceInfo( + randomAlphaOfLengthBetween(5, 10), + randomLongBetween(1, 1000), + randomLongBetween(1, 1000), + randomAlphaOfLengthBetween(5, 10), + new TaskResourceUsage(randomLongBetween(1, 1000), randomLongBetween(1, 1000)) + ) + ) + ); + + records.add(new SearchQueryRecord(timestamp, measurements, attributes)); + timestamp += interval; + } + return records; + } + + public static TopQueries createRandomTopQueries() { + DiscoveryNode node = new DiscoveryNode( + "node_for_top_queries_test", + buildNewFakeTransportAddress(), + emptyMap(), + emptySet(), + VersionUtils.randomVersion(random()) + ); + List records = generateQueryInsightRecords(10); + + return new TopQueries(node, records); + } + + public static TopQueries createFixedTopQueries() { + DiscoveryNode node = new DiscoveryNode( + "node_for_top_queries_test", + buildNewFakeTransportAddress(), + emptyMap(), + emptySet(), + VersionUtils.randomVersion(random()) + ); + List records = new ArrayList<>(); + records.add(createFixedSearchQueryRecord()); + + return new TopQueries(node, records); + } + + public static SearchQueryRecord createFixedSearchQueryRecord() { + long timestamp = 1706574180000L; + Map measurements = Map.of(MetricType.LATENCY, 1L); + + Map phaseLatencyMap = new HashMap<>(); + Map attributes = new HashMap<>(); + attributes.put(Attribute.SEARCH_TYPE, SearchType.QUERY_THEN_FETCH.toString().toLowerCase(Locale.ROOT)); + + return new SearchQueryRecord(timestamp, measurements, attributes); + } + + public static void compareJson(ToXContent param1, ToXContent param2) throws IOException { + if (param1 == null || param2 == null) { + assertNull(param1); + assertNull(param2); + return; + } + + ToXContent.Params params = ToXContent.EMPTY_PARAMS; + XContentBuilder param1Builder = jsonBuilder(); + param1.toXContent(param1Builder, params); + + XContentBuilder param2Builder = jsonBuilder(); + param2.toXContent(param2Builder, params); + + assertEquals(param1Builder.toString(), param2Builder.toString()); + } + + @SuppressWarnings("unchecked") + public static boolean checkRecordsEquals(List records1, List records2) { + if (records1.size() != records2.size()) { + return false; + } + for (int i = 0; i < records1.size(); i++) { + if (!records1.get(i).equals(records2.get(i))) { + return false; + } + Map attributes1 = records1.get(i).getAttributes(); + Map attributes2 = records2.get(i).getAttributes(); + for (Map.Entry entry : attributes1.entrySet()) { + Attribute attribute = entry.getKey(); + Object value = entry.getValue(); + if (!attributes2.containsKey(attribute)) { + return false; + } + if (value instanceof Object[] && !Arrays.deepEquals((Object[]) value, (Object[]) attributes2.get(attribute))) { + return false; + } else if (value instanceof Map + && !Maps.deepEquals((Map) value, (Map) attributes2.get(attribute))) { + return false; + } + } + } + return true; + } + + public static boolean checkRecordsEqualsWithoutOrder( + List records1, + List records2, + MetricType metricType + ) { + Set set2 = new TreeSet<>((a, b) -> SearchQueryRecord.compare(a, b, metricType)); + set2.addAll(records2); + if (records1.size() != records2.size()) { + return false; + } + for (int i = 0; i < records1.size(); i++) { + if (!set2.contains(records1.get(i))) { + return false; + } + } + return true; + } + + public static void registerAllQueryInsightsSettings(ClusterSettings clusterSettings) { + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_CPU_QUERIES_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_CPU_QUERIES_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_CPU_QUERIES_WINDOW_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_CPU_EXPORTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_QUERIES_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS); + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java new file mode 100644 index 0000000000000..051a7105a0dc0 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java @@ -0,0 +1,222 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.listener; + +import org.opensearch.action.search.SearchPhaseContext; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchRequestContext; +import org.opensearch.action.search.SearchTask; +import org.opensearch.action.search.SearchType; +import org.opensearch.action.support.replication.ClusterStateCreationUtils; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.tasks.TaskId; +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.core.service.TopQueriesService; +import org.opensearch.plugin.insights.rules.model.Attribute; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.opensearch.search.aggregations.support.ValueType; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskResourceTrackingService; +import org.opensearch.test.ClusterServiceUtils; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; + +import org.mockito.ArgumentCaptor; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit Tests for {@link QueryInsightsListener}. + */ +public class QueryInsightsListenerTests extends OpenSearchTestCase { + private final SearchRequestContext searchRequestContext = mock(SearchRequestContext.class); + private final SearchPhaseContext searchPhaseContext = mock(SearchPhaseContext.class); + private final SearchRequest searchRequest = mock(SearchRequest.class); + private final QueryInsightsService queryInsightsService = mock(QueryInsightsService.class); + private final TopQueriesService topQueriesService = mock(TopQueriesService.class); + private final TaskResourceTrackingService taskResourceTrackingService = mock(TaskResourceTrackingService.class); + private final ThreadPool threadPool = new TestThreadPool("QueryInsightsThreadPool"); + private ClusterService clusterService; + + @Before + public void setup() { + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + QueryInsightsTestUtils.registerAllQueryInsightsSettings(clusterSettings); + ClusterState state = ClusterStateCreationUtils.stateWithActivePrimary("test", true, 1 + randomInt(3), randomInt(2)); + clusterService = ClusterServiceUtils.createClusterService(threadPool, state.getNodes().getLocalNode(), clusterSettings); + ClusterServiceUtils.setState(clusterService, state); + clusterService.setTaskResourceTrackingService(taskResourceTrackingService); + when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); + when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService); + + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + threadPool.getThreadContext().setHeaders(new Tuple<>(Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel"), new HashMap<>())); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + IOUtils.close(clusterService); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + + @SuppressWarnings("unchecked") + public void testOnRequestEnd() throws InterruptedException { + Long timestamp = System.currentTimeMillis() - 100L; + SearchType searchType = SearchType.QUERY_THEN_FETCH; + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword")); + searchSourceBuilder.size(0); + SearchTask task = new SearchTask( + 0, + "n/a", + "n/a", + () -> "test", + TaskId.EMPTY_TASK_ID, + Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel") + ); + + String[] indices = new String[] { "index-1", "index-2" }; + + Map phaseLatencyMap = new HashMap<>(); + phaseLatencyMap.put("expand", 0L); + phaseLatencyMap.put("query", 20L); + phaseLatencyMap.put("fetch", 1L); + + int numberOfShards = 10; + + QueryInsightsListener queryInsightsListener = new QueryInsightsListener(clusterService, queryInsightsService); + + when(searchRequest.getOrCreateAbsoluteStartMillis()).thenReturn(timestamp); + when(searchRequest.searchType()).thenReturn(searchType); + when(searchRequest.source()).thenReturn(searchSourceBuilder); + when(searchRequest.indices()).thenReturn(indices); + when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap); + when(searchPhaseContext.getRequest()).thenReturn(searchRequest); + when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards); + when(searchPhaseContext.getTask()).thenReturn(task); + ArgumentCaptor captor = ArgumentCaptor.forClass(SearchQueryRecord.class); + + queryInsightsListener.onRequestEnd(searchPhaseContext, searchRequestContext); + + verify(queryInsightsService, times(1)).addRecord(captor.capture()); + SearchQueryRecord generatedRecord = captor.getValue(); + assertEquals(timestamp.longValue(), generatedRecord.getTimestamp()); + assertEquals(numberOfShards, generatedRecord.getAttributes().get(Attribute.TOTAL_SHARDS)); + assertEquals(searchType.toString().toLowerCase(Locale.ROOT), generatedRecord.getAttributes().get(Attribute.SEARCH_TYPE)); + assertEquals(searchSourceBuilder.toString(), generatedRecord.getAttributes().get(Attribute.SOURCE)); + Map labels = (Map) generatedRecord.getAttributes().get(Attribute.LABELS); + assertEquals("userLabel", labels.get(Task.X_OPAQUE_ID)); + verify(taskResourceTrackingService, times(1)).refreshResourceStats(task); + } + + public void testConcurrentOnRequestEnd() throws InterruptedException { + Long timestamp = System.currentTimeMillis() - 100L; + SearchType searchType = SearchType.QUERY_THEN_FETCH; + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword")); + searchSourceBuilder.size(0); + SearchTask task = new SearchTask( + 0, + "n/a", + "n/a", + () -> "test", + TaskId.EMPTY_TASK_ID, + Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel") + ); + + String[] indices = new String[] { "index-1", "index-2" }; + + Map phaseLatencyMap = new HashMap<>(); + phaseLatencyMap.put("expand", 0L); + phaseLatencyMap.put("query", 20L); + phaseLatencyMap.put("fetch", 1L); + + int numberOfShards = 10; + + final List searchListenersList = new ArrayList<>(); + + when(searchRequest.getOrCreateAbsoluteStartMillis()).thenReturn(timestamp); + when(searchRequest.searchType()).thenReturn(searchType); + when(searchRequest.source()).thenReturn(searchSourceBuilder); + when(searchRequest.indices()).thenReturn(indices); + when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap); + when(searchPhaseContext.getRequest()).thenReturn(searchRequest); + when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards); + when(searchPhaseContext.getTask()).thenReturn(task); + + int numRequests = 50; + Thread[] threads = new Thread[numRequests]; + Phaser phaser = new Phaser(numRequests + 1); + CountDownLatch countDownLatch = new CountDownLatch(numRequests); + + for (int i = 0; i < numRequests; i++) { + searchListenersList.add(new QueryInsightsListener(clusterService, queryInsightsService)); + } + + for (int i = 0; i < numRequests; i++) { + int finalI = i; + threads[i] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + QueryInsightsListener thisListener = searchListenersList.get(finalI); + thisListener.onRequestEnd(searchPhaseContext, searchRequestContext); + countDownLatch.countDown(); + }); + threads[i].start(); + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); + + verify(queryInsightsService, times(numRequests)).addRecord(any()); + verify(taskResourceTrackingService, times(numRequests)).refreshResourceStats(task); + } + + public void testSetEnabled() { + when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); + QueryInsightsListener queryInsightsListener = new QueryInsightsListener(clusterService, queryInsightsService); + queryInsightsListener.setEnableTopQueries(MetricType.LATENCY, true); + assertTrue(queryInsightsListener.isEnabled()); + + when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(false); + when(queryInsightsService.isCollectionEnabled(MetricType.CPU)).thenReturn(false); + when(queryInsightsService.isCollectionEnabled(MetricType.MEMORY)).thenReturn(false); + queryInsightsListener.setEnableTopQueries(MetricType.LATENCY, false); + assertFalse(queryInsightsListener.isEnabled()); + } +} From 7185e59980b3a12252a09dc6202668df3e4e1a20 Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Fri, 12 Jul 2024 12:48:28 -0700 Subject: [PATCH 04/15] remove query insights plugin related code Signed-off-by: Chenyang Ji --- .../core/listener/QueryInsightsListener.java | 3 - .../insights/rules/model/Attribute.java | 70 ------------------- .../rules/model/SearchQueryRecord.java | 8 +-- .../insights/QueryInsightsTestUtils.java | 21 ------ .../listener/QueryInsightsListenerTests.java | 5 -- 5 files changed, 2 insertions(+), 105 deletions(-) diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java index 63ed7a51da896..a1f810ad5987c 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -145,9 +145,6 @@ public void onRequestFailure(final SearchPhaseContext context, final SearchReque private void constructSearchQueryRecord(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) { SearchTask searchTask = context.getTask(); List tasksResourceUsages = searchRequestContext.getPhaseResourceUsage(); - if (clusterService.getTaskResourceTrackingService() != null) { - clusterService.getTaskResourceTrackingService().refreshResourceStats(searchTask); - } tasksResourceUsages.add( new TaskResourceInfo( searchTask.getAction(), diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java index 80c80fb6b6937..dcdb085fdc6fa 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java @@ -8,18 +8,11 @@ package org.opensearch.plugin.insights.rules.model; -import org.apache.lucene.util.ArrayUtil; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.common.io.stream.Writeable; -import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; import java.util.Locale; -import java.util.Map; /** * Valid attributes for a search query record @@ -82,69 +75,6 @@ static void writeTo(final StreamOutput out, final Attribute attribute) throws IO out.writeString(attribute.toString()); } - /** - * Write Attribute value to a StreamOutput - * @param out the StreamOutput to write - * @param attributeValue the Attribute value to write - */ - @SuppressWarnings("unchecked") - public static void writeValueTo(StreamOutput out, Object attributeValue) throws IOException { - if (attributeValue instanceof List) { - out.writeList((List) attributeValue); - } else { - out.writeGenericValue(attributeValue); - } - } - - /** - * Read attribute value from the input stream given the Attribute type - * - * @param in the {@link StreamInput} input to read - * @param attribute attribute type to differentiate between Source and others - * @return parse value - * @throws IOException IOException - */ - public static Object readAttributeValue(StreamInput in, Attribute attribute) throws IOException { - if (attribute == Attribute.TASK_RESOURCE_USAGES) { - return in.readList(TaskResourceInfo::readFromStream); - } else { - return in.readGenericValue(); - } - } - - /** - * Read attribute map from the input stream - * - * @param in the {@link StreamInput} to read - * @return parsed attribute map - * @throws IOException IOException - */ - public static Map readAttributeMap(StreamInput in) throws IOException { - int size = readArraySize(in); - if (size == 0) { - return Collections.emptyMap(); - } - Map map = new HashMap<>(size); - - for (int i = 0; i < size; i++) { - Attribute key = readFromStream(in); - Object value = readAttributeValue(in, key); - map.put(key, value); - } - return map; - } - - private static int readArraySize(StreamInput in) throws IOException { - final int arraySize = in.readVInt(); - if (arraySize > ArrayUtil.MAX_ARRAY_LENGTH) { - throw new IllegalStateException("array length must be <= to " + ArrayUtil.MAX_ARRAY_LENGTH + " but was: " + arraySize); - } - if (arraySize < 0) { - throw new NegativeArraySizeException("array size must be positive but was: " + arraySize); - } - return arraySize; - } - @Override public String toString() { return this.name().toLowerCase(Locale.ROOT); diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java index a6e6b4a9051f0..fec00a680ae58 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java @@ -45,7 +45,7 @@ public SearchQueryRecord(final StreamInput in) throws IOException, ClassCastExce measurements = new HashMap<>(); in.readMap(MetricType::readFromStream, StreamInput::readGenericValue) .forEach(((metricType, o) -> measurements.put(metricType, metricType.parseValue(o)))); - this.attributes = Attribute.readAttributeMap(in); + this.attributes = in.readMap(Attribute::readFromStream, StreamInput::readGenericValue); } /** @@ -134,11 +134,7 @@ public XContentBuilder toXContent(final XContentBuilder builder, final ToXConten public void writeTo(final StreamOutput out) throws IOException { out.writeLong(timestamp); out.writeMap(measurements, (stream, metricType) -> MetricType.writeTo(out, metricType), StreamOutput::writeGenericValue); - out.writeMap( - attributes, - (stream, attribute) -> Attribute.writeTo(out, attribute), - (stream, attributeValue) -> Attribute.writeValueTo(out, attributeValue) - ); + out.writeMap(attributes, (stream, attribute) -> Attribute.writeTo(out, attribute), StreamOutput::writeGenericValue); } /** diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java index 54cafdc97ea74..7fa4e9841c20e 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java @@ -12,8 +12,6 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.util.Maps; -import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; -import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries; @@ -82,25 +80,6 @@ public static List generateQueryInsightRecords(int lower, int attributes.put(Attribute.TOTAL_SHARDS, randomIntBetween(1, 100)); attributes.put(Attribute.INDICES, randomArray(1, 3, Object[]::new, () -> randomAlphaOfLengthBetween(5, 10))); attributes.put(Attribute.PHASE_LATENCY_MAP, phaseLatencyMap); - attributes.put( - Attribute.TASK_RESOURCE_USAGES, - List.of( - new TaskResourceInfo( - randomAlphaOfLengthBetween(5, 10), - randomLongBetween(1, 1000), - randomLongBetween(1, 1000), - randomAlphaOfLengthBetween(5, 10), - new TaskResourceUsage(randomLongBetween(1, 1000), randomLongBetween(1, 1000)) - ), - new TaskResourceInfo( - randomAlphaOfLengthBetween(5, 10), - randomLongBetween(1, 1000), - randomLongBetween(1, 1000), - randomAlphaOfLengthBetween(5, 10), - new TaskResourceUsage(randomLongBetween(1, 1000), randomLongBetween(1, 1000)) - ) - ) - ); records.add(new SearchQueryRecord(timestamp, measurements, attributes)); timestamp += interval; diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java index 051a7105a0dc0..86de44c680188 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java @@ -32,7 +32,6 @@ import org.opensearch.search.aggregations.support.ValueType; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.tasks.Task; -import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -66,7 +65,6 @@ public class QueryInsightsListenerTests extends OpenSearchTestCase { private final SearchRequest searchRequest = mock(SearchRequest.class); private final QueryInsightsService queryInsightsService = mock(QueryInsightsService.class); private final TopQueriesService topQueriesService = mock(TopQueriesService.class); - private final TaskResourceTrackingService taskResourceTrackingService = mock(TaskResourceTrackingService.class); private final ThreadPool threadPool = new TestThreadPool("QueryInsightsThreadPool"); private ClusterService clusterService; @@ -79,7 +77,6 @@ public void setup() { ClusterState state = ClusterStateCreationUtils.stateWithActivePrimary("test", true, 1 + randomInt(3), randomInt(2)); clusterService = ClusterServiceUtils.createClusterService(threadPool, state.getNodes().getLocalNode(), clusterSettings); ClusterServiceUtils.setState(clusterService, state); - clusterService.setTaskResourceTrackingService(taskResourceTrackingService); when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService); @@ -142,7 +139,6 @@ public void testOnRequestEnd() throws InterruptedException { assertEquals(searchSourceBuilder.toString(), generatedRecord.getAttributes().get(Attribute.SOURCE)); Map labels = (Map) generatedRecord.getAttributes().get(Attribute.LABELS); assertEquals("userLabel", labels.get(Task.X_OPAQUE_ID)); - verify(taskResourceTrackingService, times(1)).refreshResourceStats(task); } public void testConcurrentOnRequestEnd() throws InterruptedException { @@ -204,7 +200,6 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { countDownLatch.await(); verify(queryInsightsService, times(numRequests)).addRecord(any()); - verify(taskResourceTrackingService, times(numRequests)).refreshResourceStats(task); } public void testSetEnabled() { From 110e4fe9403346fac4c27c1599e8f587fabd1468 Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Tue, 16 Jul 2024 21:15:07 -0700 Subject: [PATCH 05/15] create per request listener to refresh task resource usage Signed-off-by: Chenyang Ji --- ...estOperationsCompositeListenerFactory.java | 4 +- .../action/search/TransportSearchAction.java | 4 +- .../cluster/service/ClusterService.java | 20 --------- .../main/java/org/opensearch/node/Node.java | 1 - .../SearchTaskResourceOperationsListener.java | 43 +++++++++++++++++++ .../tasks/TaskResourceTrackingService.java | 3 -- 6 files changed, 48 insertions(+), 27 deletions(-) create mode 100644 server/src/main/java/org/opensearch/tasks/SearchTaskResourceOperationsListener.java diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactory.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactory.java index db487bf945889..842a0e2dc61fe 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactory.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactory.java @@ -69,8 +69,8 @@ public SearchRequestOperationsListener.CompositeListener buildCompositeListener( final SearchRequestOperationsListener... perRequestListeners ) { final List searchListenersList = Stream.concat( - searchRequestListenersList.stream(), - Arrays.stream(perRequestListeners) + Arrays.stream(perRequestListeners), + searchRequestListenersList.stream() ) .filter((searchRequestOperationsListener -> searchRequestOperationsListener.isEnabled(searchRequest))) .collect(Collectors.toList()); diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 7d3237d43cd5c..c250b3893f199 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -86,6 +86,7 @@ import org.opensearch.search.profile.ProfileShardResult; import org.opensearch.search.profile.SearchProfileShardResults; import org.opensearch.tasks.CancellableTask; +import org.opensearch.tasks.SearchTaskResourceOperationsListener; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.telemetry.metrics.MetricsRegistry; @@ -433,7 +434,8 @@ private void executeRequest( requestOperationsListeners = searchRequestOperationsCompositeListenerFactory.buildCompositeListener( originalSearchRequest, logger, - TraceableSearchRequestOperationsListener.create(tracer, requestSpan) + TraceableSearchRequestOperationsListener.create(tracer, requestSpan), + new SearchTaskResourceOperationsListener(taskResourceTrackingService) ); SearchRequestContext searchRequestContext = new SearchRequestContext( requestOperationsListeners, diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java index 4ece885a55b70..c3c48dd8b87ef 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java @@ -54,7 +54,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexingPressureService; import org.opensearch.node.Node; -import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.threadpool.ThreadPool; @@ -93,7 +92,6 @@ public class ClusterService extends AbstractLifecycleComponent { private RerouteService rerouteService; private IndexingPressureService indexingPressureService; - private TaskResourceTrackingService taskResourceTrackingService; public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { this(settings, clusterSettings, threadPool, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)); @@ -267,24 +265,6 @@ public IndexingPressureService getIndexingPressureService() { return indexingPressureService; } - /** - * Getter for {@link TaskResourceTrackingService}, This method exposes task level resource usage for other components to use. - * - * @return TaskResourceTrackingService - */ - public TaskResourceTrackingService getTaskResourceTrackingService() { - return taskResourceTrackingService; - } - - /** - * Setter for {@link TaskResourceTrackingService} - * - * @param taskResourceTrackingService taskResourceTrackingService - */ - public void setTaskResourceTrackingService(TaskResourceTrackingService taskResourceTrackingService) { - this.taskResourceTrackingService = taskResourceTrackingService; - } - public ClusterApplierService getClusterApplierService() { return clusterApplierService; } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 798c95ed73dea..d91b2a45a48c6 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1122,7 +1122,6 @@ protected Node( clusterService.getClusterSettings(), threadPool ); - clusterService.setTaskResourceTrackingService(taskResourceTrackingService); final SearchBackpressureSettings searchBackpressureSettings = new SearchBackpressureSettings( settings, diff --git a/server/src/main/java/org/opensearch/tasks/SearchTaskResourceOperationsListener.java b/server/src/main/java/org/opensearch/tasks/SearchTaskResourceOperationsListener.java new file mode 100644 index 0000000000000..56686388f7aa1 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/SearchTaskResourceOperationsListener.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks; + +import org.opensearch.action.search.SearchPhaseContext; +import org.opensearch.action.search.SearchRequestContext; +import org.opensearch.action.search.SearchRequestOperationsListener; + +/** + * SearchTaskResourceOperationsListener subscriber for operations on search tasks resource usages + * + * @opensearch.internal + */ +public final class SearchTaskResourceOperationsListener extends SearchRequestOperationsListener { + private final TaskResourceTrackingService taskResourceTrackingService; + + public SearchTaskResourceOperationsListener(TaskResourceTrackingService taskResourceTrackingService) { + this.taskResourceTrackingService = taskResourceTrackingService; + } + + @Override + protected void onPhaseStart(SearchPhaseContext context) {} + + @Override + protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + + @Override + protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {} + + @Override + public void onRequestStart(SearchRequestContext searchRequestContext) {} + + @Override + public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + taskResourceTrackingService.refreshResourceStats(context.getTask()); + } +} diff --git a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java b/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java index 80c9e2227e9fe..ca1957cdb1633 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java +++ b/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java @@ -16,7 +16,6 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.action.search.SearchShardTask; import org.opensearch.common.SuppressForbidden; -import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; @@ -52,7 +51,6 @@ /** * Service that helps track resource usage of tasks running on a node. */ -@PublicApi(since = "2.16.0") @SuppressForbidden(reason = "ThreadMXBean#getThreadAllocatedBytes") public class TaskResourceTrackingService implements RunnableTaskExecutionListener { @@ -359,7 +357,6 @@ public TaskResourceInfo getTaskResourceUsageFromThreadContext() { /** * Listener that gets invoked when a task execution completes. */ - @PublicApi(since = "2.16.0") public interface TaskCompletionListener { void onTaskCompleted(Task task); } From 545e2354ab786aa105aa1060c9ca23de96d8a2d7 Mon Sep 17 00:00:00 2001 From: Siddhant Deshmukh Date: Thu, 18 Jul 2024 12:48:22 -0700 Subject: [PATCH 06/15] Make new listener API public Signed-off-by: Siddhant Deshmukh --- .../tasks/SearchTaskResourceOperationsListener.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/tasks/SearchTaskResourceOperationsListener.java b/server/src/main/java/org/opensearch/tasks/SearchTaskResourceOperationsListener.java index 56686388f7aa1..e7f752c7af183 100644 --- a/server/src/main/java/org/opensearch/tasks/SearchTaskResourceOperationsListener.java +++ b/server/src/main/java/org/opensearch/tasks/SearchTaskResourceOperationsListener.java @@ -13,9 +13,11 @@ import org.opensearch.action.search.SearchRequestOperationsListener; /** - * SearchTaskResourceOperationsListener subscriber for operations on search tasks resource usages + * SearchTaskResourceOperationsListener subscriber for operations on search tasks resource usages. + * Listener ensures to refreshResourceStats on request end capturing the search task resource usage + * upon request completion. * - * @opensearch.internal + * @PublicApi */ public final class SearchTaskResourceOperationsListener extends SearchRequestOperationsListener { private final TaskResourceTrackingService taskResourceTrackingService; From 64b779a49a7cbc6675c2d25f1a2dabf09e34ce1a Mon Sep 17 00:00:00 2001 From: Siddhant Deshmukh Date: Thu, 18 Jul 2024 12:55:13 -0700 Subject: [PATCH 07/15] Add changelog Signed-off-by: Siddhant Deshmukh --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ec5b838a542c4..326f00156a3cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Reduce logging in DEBUG for MasterService:run ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795)) - Enabling term version check on local state for all ClusterManager Read Transport Actions ([#14273](https://github.com/opensearch-project/OpenSearch/pull/14273)) - Add persian_stem filter (([#14847](https://github.com/opensearch-project/OpenSearch/pull/14847))) +- Create public listener to refresh search thread resource usage ([#14832](https://github.com/opensearch-project/OpenSearch/pull/14832)) ### Dependencies - Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442)) From 837ad2605283db6002f73453f2815b672aafc255 Mon Sep 17 00:00:00 2001 From: Siddhant Deshmukh Date: Thu, 18 Jul 2024 12:57:58 -0700 Subject: [PATCH 08/15] Remove wrong files added Signed-off-by: Siddhant Deshmukh --- .../core/listener/QueryInsightsListener.java | 202 ---------------- .../insights/rules/model/Attribute.java | 82 ------- .../rules/model/SearchQueryRecord.java | 183 --------------- .../insights/QueryInsightsTestUtils.java | 205 ----------------- .../listener/QueryInsightsListenerTests.java | 217 ------------------ 5 files changed, 889 deletions(-) delete mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java delete mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java delete mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java delete mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java delete mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java deleted file mode 100644 index a1f810ad5987c..0000000000000 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.plugin.insights.core.listener; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.action.search.SearchPhaseContext; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchRequestContext; -import org.opensearch.action.search.SearchRequestOperationsListener; -import org.opensearch.action.search.SearchTask; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.inject.Inject; -import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; -import org.opensearch.core.xcontent.ToXContent; -import org.opensearch.plugin.insights.core.service.QueryInsightsService; -import org.opensearch.plugin.insights.rules.model.Attribute; -import org.opensearch.plugin.insights.rules.model.MetricType; -import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; -import org.opensearch.tasks.Task; - -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNEnabledSetting; -import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNSizeSetting; -import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNWindowSizeSetting; - -/** - * The listener for query insights services. - * It forwards query-related data to the appropriate query insights stores, - * either for each request or for each phase. - * - * @opensearch.internal - */ -public final class QueryInsightsListener extends SearchRequestOperationsListener { - private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false")); - - private static final Logger log = LogManager.getLogger(QueryInsightsListener.class); - - private final QueryInsightsService queryInsightsService; - private final ClusterService clusterService; - - /** - * Constructor for QueryInsightsListener - * - * @param clusterService The Node's cluster service. - * @param queryInsightsService The topQueriesByLatencyService associated with this listener - */ - @Inject - public QueryInsightsListener(final ClusterService clusterService, final QueryInsightsService queryInsightsService) { - this.clusterService = clusterService; - this.queryInsightsService = queryInsightsService; - // Setting endpoints set up for top n queries, including enabling top n queries, window size and top n size - // Expected metricTypes are Latency, CPU and Memory. - for (MetricType type : MetricType.allMetricTypes()) { - clusterService.getClusterSettings() - .addSettingsUpdateConsumer(getTopNEnabledSetting(type), v -> this.setEnableTopQueries(type, v)); - clusterService.getClusterSettings() - .addSettingsUpdateConsumer( - getTopNSizeSetting(type), - v -> this.queryInsightsService.setTopNSize(type, v), - v -> this.queryInsightsService.validateTopNSize(type, v) - ); - clusterService.getClusterSettings() - .addSettingsUpdateConsumer( - getTopNWindowSizeSetting(type), - v -> this.queryInsightsService.setWindowSize(type, v), - v -> this.queryInsightsService.validateWindowSize(type, v) - ); - - this.setEnableTopQueries(type, clusterService.getClusterSettings().get(getTopNEnabledSetting(type))); - this.queryInsightsService.validateTopNSize(type, clusterService.getClusterSettings().get(getTopNSizeSetting(type))); - this.queryInsightsService.setTopNSize(type, clusterService.getClusterSettings().get(getTopNSizeSetting(type))); - this.queryInsightsService.validateWindowSize(type, clusterService.getClusterSettings().get(getTopNWindowSizeSetting(type))); - this.queryInsightsService.setWindowSize(type, clusterService.getClusterSettings().get(getTopNWindowSizeSetting(type))); - } - } - - /** - * Enable or disable top queries insights collection for {@link MetricType} - * This function will enable or disable the corresponding listeners - * and query insights services. - * - * @param metricType {@link MetricType} - * @param enabled boolean - */ - public void setEnableTopQueries(final MetricType metricType, final boolean enabled) { - boolean isAllMetricsDisabled = !queryInsightsService.isEnabled(); - this.queryInsightsService.enableCollection(metricType, enabled); - if (!enabled) { - // disable QueryInsightsListener only if all metrics collections are disabled now. - if (!queryInsightsService.isEnabled()) { - super.setEnabled(false); - this.queryInsightsService.stop(); - } - } else { - super.setEnabled(true); - // restart QueryInsightsListener only if none of metrics collections is enabled before. - if (isAllMetricsDisabled) { - this.queryInsightsService.stop(); - this.queryInsightsService.start(); - } - } - - } - - @Override - public boolean isEnabled() { - return super.isEnabled(); - } - - @Override - public void onPhaseStart(SearchPhaseContext context) {} - - @Override - public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} - - @Override - public void onPhaseFailure(SearchPhaseContext context, Throwable cause) {} - - @Override - public void onRequestStart(SearchRequestContext searchRequestContext) {} - - @Override - public void onRequestEnd(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) { - constructSearchQueryRecord(context, searchRequestContext); - } - - @Override - public void onRequestFailure(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) { - constructSearchQueryRecord(context, searchRequestContext); - } - - private void constructSearchQueryRecord(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) { - SearchTask searchTask = context.getTask(); - List tasksResourceUsages = searchRequestContext.getPhaseResourceUsage(); - tasksResourceUsages.add( - new TaskResourceInfo( - searchTask.getAction(), - searchTask.getId(), - searchTask.getParentTaskId().getId(), - clusterService.localNode().getId(), - searchTask.getTotalResourceStats() - ) - ); - - final SearchRequest request = context.getRequest(); - try { - Map measurements = new HashMap<>(); - if (queryInsightsService.isCollectionEnabled(MetricType.LATENCY)) { - measurements.put( - MetricType.LATENCY, - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos()) - ); - } - if (queryInsightsService.isCollectionEnabled(MetricType.CPU)) { - measurements.put( - MetricType.CPU, - tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getCpuTimeInNanos()).mapToLong(Long::longValue).sum() - ); - } - if (queryInsightsService.isCollectionEnabled(MetricType.MEMORY)) { - measurements.put( - MetricType.MEMORY, - tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getMemoryInBytes()).mapToLong(Long::longValue).sum() - ); - } - Map attributes = new HashMap<>(); - attributes.put(Attribute.SEARCH_TYPE, request.searchType().toString().toLowerCase(Locale.ROOT)); - attributes.put(Attribute.SOURCE, request.source().toString(FORMAT_PARAMS)); - attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards()); - attributes.put(Attribute.INDICES, request.indices()); - attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap()); - attributes.put(Attribute.TASK_RESOURCE_USAGES, tasksResourceUsages); - - Map labels = new HashMap<>(); - // Retrieve user provided label if exists - String userProvidedLabel = context.getTask().getHeader(Task.X_OPAQUE_ID); - if (userProvidedLabel != null) { - labels.put(Task.X_OPAQUE_ID, userProvidedLabel); - } - attributes.put(Attribute.LABELS, labels); - // construct SearchQueryRecord from attributes and measurements - SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes); - queryInsightsService.addRecord(record); - } catch (Exception e) { - log.error(String.format(Locale.ROOT, "fail to ingest query insight data, error: %s", e)); - } - } - -} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java deleted file mode 100644 index dcdb085fdc6fa..0000000000000 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.plugin.insights.rules.model; - -import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.common.io.stream.StreamOutput; - -import java.io.IOException; -import java.util.Locale; - -/** - * Valid attributes for a search query record - * - * @opensearch.internal - */ -public enum Attribute { - /** - * The search query type - */ - SEARCH_TYPE, - /** - * The search query source - */ - SOURCE, - /** - * Total shards queried - */ - TOTAL_SHARDS, - /** - * The indices involved - */ - INDICES, - /** - * The per phase level latency map for a search query - */ - PHASE_LATENCY_MAP, - /** - * The node id for this request - */ - NODE_ID, - /** - * Tasks level resource usages in this request - */ - TASK_RESOURCE_USAGES, - /** - * Custom search request labels - */ - LABELS; - - /** - * Read an Attribute from a StreamInput - * - * @param in the StreamInput to read from - * @return Attribute - * @throws IOException IOException - */ - static Attribute readFromStream(final StreamInput in) throws IOException { - return Attribute.valueOf(in.readString().toUpperCase(Locale.ROOT)); - } - - /** - * Write Attribute to a StreamOutput - * - * @param out the StreamOutput to write - * @param attribute the Attribute to write - * @throws IOException IOException - */ - static void writeTo(final StreamOutput out, final Attribute attribute) throws IOException { - out.writeString(attribute.toString()); - } - - @Override - public String toString() { - return this.name().toLowerCase(Locale.ROOT); - } -} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java deleted file mode 100644 index fec00a680ae58..0000000000000 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.plugin.insights.rules.model; - -import org.opensearch.core.common.Strings; -import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.common.io.stream.Writeable; -import org.opensearch.core.xcontent.MediaTypeRegistry; -import org.opensearch.core.xcontent.ToXContent; -import org.opensearch.core.xcontent.ToXContentObject; -import org.opensearch.core.xcontent.XContentBuilder; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; - -/** - * SearchQueryRecord represents a minimal atomic record stored in the Query Insight Framework, - * which contains extensive information related to a search query. - * - * @opensearch.internal - */ -public class SearchQueryRecord implements ToXContentObject, Writeable { - private final long timestamp; - private final Map measurements; - private final Map attributes; - - /** - * Constructor of SearchQueryRecord - * - * @param in the StreamInput to read the SearchQueryRecord from - * @throws IOException IOException - * @throws ClassCastException ClassCastException - */ - public SearchQueryRecord(final StreamInput in) throws IOException, ClassCastException { - this.timestamp = in.readLong(); - measurements = new HashMap<>(); - in.readMap(MetricType::readFromStream, StreamInput::readGenericValue) - .forEach(((metricType, o) -> measurements.put(metricType, metricType.parseValue(o)))); - this.attributes = in.readMap(Attribute::readFromStream, StreamInput::readGenericValue); - } - - /** - * Constructor of SearchQueryRecord - * - * @param timestamp The timestamp of the query. - * @param measurements A list of Measurement associated with this query - * @param attributes A list of Attributes associated with this query - */ - public SearchQueryRecord(final long timestamp, Map measurements, final Map attributes) { - if (measurements == null) { - throw new IllegalArgumentException("Measurements cannot be null"); - } - this.measurements = measurements; - this.attributes = attributes; - this.timestamp = timestamp; - } - - /** - * Returns the observation time of the metric. - * - * @return the observation time in milliseconds - */ - public long getTimestamp() { - return timestamp; - } - - /** - * Returns the measurement associated with the specified name. - * - * @param name the name of the measurement - * @return the measurement object, or null if not found - */ - public Number getMeasurement(final MetricType name) { - return measurements.get(name); - } - - /** - * Returns a map of all the measurements associated with the metric. - * - * @return a map of measurement names to measurement objects - */ - public Map getMeasurements() { - return measurements; - } - - /** - * Returns a map of the attributes associated with the metric. - * - * @return a map of attribute keys to attribute values - */ - public Map getAttributes() { - return attributes; - } - - /** - * Add an attribute to this record - * - * @param attribute attribute to add - * @param value the value associated with the attribute - */ - public void addAttribute(final Attribute attribute, final Object value) { - attributes.put(attribute, value); - } - - @Override - public XContentBuilder toXContent(final XContentBuilder builder, final ToXContent.Params params) throws IOException { - builder.startObject(); - builder.field("timestamp", timestamp); - for (Map.Entry entry : attributes.entrySet()) { - builder.field(entry.getKey().toString(), entry.getValue()); - } - for (Map.Entry entry : measurements.entrySet()) { - builder.field(entry.getKey().toString(), entry.getValue()); - } - return builder.endObject(); - } - - /** - * Write a SearchQueryRecord to a StreamOutput - * - * @param out the StreamOutput to write - * @throws IOException IOException - */ - @Override - public void writeTo(final StreamOutput out) throws IOException { - out.writeLong(timestamp); - out.writeMap(measurements, (stream, metricType) -> MetricType.writeTo(out, metricType), StreamOutput::writeGenericValue); - out.writeMap(attributes, (stream, attribute) -> Attribute.writeTo(out, attribute), StreamOutput::writeGenericValue); - } - - /** - * Compare two SearchQueryRecord, based on the given MetricType - * - * @param a the first SearchQueryRecord to compare - * @param b the second SearchQueryRecord to compare - * @param metricType the MetricType to compare on - * @return 0 if the first SearchQueryRecord is numerically equal to the second SearchQueryRecord; - * -1 if the first SearchQueryRecord is numerically less than the second SearchQueryRecord; - * 1 if the first SearchQueryRecord is numerically greater than the second SearchQueryRecord. - */ - public static int compare(final SearchQueryRecord a, final SearchQueryRecord b, final MetricType metricType) { - return metricType.compare(a.getMeasurement(metricType), b.getMeasurement(metricType)); - } - - /** - * Check if a SearchQueryRecord is deep equal to another record - * - * @param o the other SearchQueryRecord record - * @return true if two records are deep equal, false otherwise. - */ - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (!(o instanceof SearchQueryRecord)) { - return false; - } - final SearchQueryRecord other = (SearchQueryRecord) o; - return timestamp == other.getTimestamp() - && measurements.equals(other.getMeasurements()) - && attributes.size() == other.getAttributes().size(); - } - - @Override - public int hashCode() { - return Objects.hash(timestamp, measurements, attributes); - } - - @Override - public String toString() { - return Strings.toString(MediaTypeRegistry.JSON, this); - } -} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java deleted file mode 100644 index 7fa4e9841c20e..0000000000000 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.plugin.insights; - -import org.opensearch.action.search.SearchType; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.util.Maps; -import org.opensearch.core.xcontent.ToXContent; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries; -import org.opensearch.plugin.insights.rules.model.Attribute; -import org.opensearch.plugin.insights.rules.model.MetricType; -import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; -import org.opensearch.plugin.insights.settings.QueryInsightsSettings; -import org.opensearch.test.VersionUtils; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; - -import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; -import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.opensearch.test.OpenSearchTestCase.buildNewFakeTransportAddress; -import static org.opensearch.test.OpenSearchTestCase.random; -import static org.opensearch.test.OpenSearchTestCase.randomAlphaOfLengthBetween; -import static org.opensearch.test.OpenSearchTestCase.randomArray; -import static org.opensearch.test.OpenSearchTestCase.randomIntBetween; -import static org.opensearch.test.OpenSearchTestCase.randomLong; -import static org.opensearch.test.OpenSearchTestCase.randomLongBetween; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -final public class QueryInsightsTestUtils { - - public QueryInsightsTestUtils() {} - - public static List generateQueryInsightRecords(int count) { - return generateQueryInsightRecords(count, count, System.currentTimeMillis(), 0); - } - - /** - * Creates a List of random Query Insight Records for testing purpose - */ - public static List generateQueryInsightRecords(int lower, int upper, long startTimeStamp, long interval) { - List records = new ArrayList<>(); - int countOfRecords = randomIntBetween(lower, upper); - long timestamp = startTimeStamp; - for (int i = 0; i < countOfRecords; ++i) { - Map measurements = Map.of( - MetricType.LATENCY, - randomLongBetween(1000, 10000), - MetricType.CPU, - randomLongBetween(1000, 10000), - MetricType.MEMORY, - randomLongBetween(1000, 10000) - ); - - Map phaseLatencyMap = new HashMap<>(); - int countOfPhases = randomIntBetween(2, 5); - for (int j = 0; j < countOfPhases; ++j) { - phaseLatencyMap.put(randomAlphaOfLengthBetween(5, 10), randomLong()); - } - Map attributes = new HashMap<>(); - attributes.put(Attribute.SEARCH_TYPE, SearchType.QUERY_THEN_FETCH.toString().toLowerCase(Locale.ROOT)); - attributes.put(Attribute.SOURCE, "{\"size\":20}"); - attributes.put(Attribute.TOTAL_SHARDS, randomIntBetween(1, 100)); - attributes.put(Attribute.INDICES, randomArray(1, 3, Object[]::new, () -> randomAlphaOfLengthBetween(5, 10))); - attributes.put(Attribute.PHASE_LATENCY_MAP, phaseLatencyMap); - - records.add(new SearchQueryRecord(timestamp, measurements, attributes)); - timestamp += interval; - } - return records; - } - - public static TopQueries createRandomTopQueries() { - DiscoveryNode node = new DiscoveryNode( - "node_for_top_queries_test", - buildNewFakeTransportAddress(), - emptyMap(), - emptySet(), - VersionUtils.randomVersion(random()) - ); - List records = generateQueryInsightRecords(10); - - return new TopQueries(node, records); - } - - public static TopQueries createFixedTopQueries() { - DiscoveryNode node = new DiscoveryNode( - "node_for_top_queries_test", - buildNewFakeTransportAddress(), - emptyMap(), - emptySet(), - VersionUtils.randomVersion(random()) - ); - List records = new ArrayList<>(); - records.add(createFixedSearchQueryRecord()); - - return new TopQueries(node, records); - } - - public static SearchQueryRecord createFixedSearchQueryRecord() { - long timestamp = 1706574180000L; - Map measurements = Map.of(MetricType.LATENCY, 1L); - - Map phaseLatencyMap = new HashMap<>(); - Map attributes = new HashMap<>(); - attributes.put(Attribute.SEARCH_TYPE, SearchType.QUERY_THEN_FETCH.toString().toLowerCase(Locale.ROOT)); - - return new SearchQueryRecord(timestamp, measurements, attributes); - } - - public static void compareJson(ToXContent param1, ToXContent param2) throws IOException { - if (param1 == null || param2 == null) { - assertNull(param1); - assertNull(param2); - return; - } - - ToXContent.Params params = ToXContent.EMPTY_PARAMS; - XContentBuilder param1Builder = jsonBuilder(); - param1.toXContent(param1Builder, params); - - XContentBuilder param2Builder = jsonBuilder(); - param2.toXContent(param2Builder, params); - - assertEquals(param1Builder.toString(), param2Builder.toString()); - } - - @SuppressWarnings("unchecked") - public static boolean checkRecordsEquals(List records1, List records2) { - if (records1.size() != records2.size()) { - return false; - } - for (int i = 0; i < records1.size(); i++) { - if (!records1.get(i).equals(records2.get(i))) { - return false; - } - Map attributes1 = records1.get(i).getAttributes(); - Map attributes2 = records2.get(i).getAttributes(); - for (Map.Entry entry : attributes1.entrySet()) { - Attribute attribute = entry.getKey(); - Object value = entry.getValue(); - if (!attributes2.containsKey(attribute)) { - return false; - } - if (value instanceof Object[] && !Arrays.deepEquals((Object[]) value, (Object[]) attributes2.get(attribute))) { - return false; - } else if (value instanceof Map - && !Maps.deepEquals((Map) value, (Map) attributes2.get(attribute))) { - return false; - } - } - } - return true; - } - - public static boolean checkRecordsEqualsWithoutOrder( - List records1, - List records2, - MetricType metricType - ) { - Set set2 = new TreeSet<>((a, b) -> SearchQueryRecord.compare(a, b, metricType)); - set2.addAll(records2); - if (records1.size() != records2.size()) { - return false; - } - for (int i = 0; i < records1.size(); i++) { - if (!set2.contains(records1.get(i))) { - return false; - } - } - return true; - } - - public static void registerAllQueryInsightsSettings(ClusterSettings clusterSettings) { - clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); - clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); - clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); - clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS); - clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_CPU_QUERIES_ENABLED); - clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_CPU_QUERIES_SIZE); - clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_CPU_QUERIES_WINDOW_SIZE); - clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_CPU_EXPORTER_SETTINGS); - clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_QUERIES_ENABLED); - clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE); - clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE); - clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS); - } -} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java deleted file mode 100644 index 86de44c680188..0000000000000 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.plugin.insights.core.listener; - -import org.opensearch.action.search.SearchPhaseContext; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchRequestContext; -import org.opensearch.action.search.SearchTask; -import org.opensearch.action.search.SearchType; -import org.opensearch.action.support.replication.ClusterStateCreationUtils; -import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.collect.Tuple; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.common.util.io.IOUtils; -import org.opensearch.core.tasks.TaskId; -import org.opensearch.plugin.insights.QueryInsightsTestUtils; -import org.opensearch.plugin.insights.core.service.QueryInsightsService; -import org.opensearch.plugin.insights.core.service.TopQueriesService; -import org.opensearch.plugin.insights.rules.model.Attribute; -import org.opensearch.plugin.insights.rules.model.MetricType; -import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; -import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; -import org.opensearch.search.aggregations.support.ValueType; -import org.opensearch.search.builder.SearchSourceBuilder; -import org.opensearch.tasks.Task; -import org.opensearch.test.ClusterServiceUtils; -import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.TestThreadPool; -import org.opensearch.threadpool.ThreadPool; -import org.junit.Before; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Phaser; -import java.util.concurrent.TimeUnit; - -import org.mockito.ArgumentCaptor; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -/** - * Unit Tests for {@link QueryInsightsListener}. - */ -public class QueryInsightsListenerTests extends OpenSearchTestCase { - private final SearchRequestContext searchRequestContext = mock(SearchRequestContext.class); - private final SearchPhaseContext searchPhaseContext = mock(SearchPhaseContext.class); - private final SearchRequest searchRequest = mock(SearchRequest.class); - private final QueryInsightsService queryInsightsService = mock(QueryInsightsService.class); - private final TopQueriesService topQueriesService = mock(TopQueriesService.class); - private final ThreadPool threadPool = new TestThreadPool("QueryInsightsThreadPool"); - private ClusterService clusterService; - - @Before - public void setup() { - Settings.Builder settingsBuilder = Settings.builder(); - Settings settings = settingsBuilder.build(); - ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - QueryInsightsTestUtils.registerAllQueryInsightsSettings(clusterSettings); - ClusterState state = ClusterStateCreationUtils.stateWithActivePrimary("test", true, 1 + randomInt(3), randomInt(2)); - clusterService = ClusterServiceUtils.createClusterService(threadPool, state.getNodes().getLocalNode(), clusterSettings); - ClusterServiceUtils.setState(clusterService, state); - when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); - when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService); - - ThreadContext threadContext = new ThreadContext(Settings.EMPTY); - threadPool.getThreadContext().setHeaders(new Tuple<>(Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel"), new HashMap<>())); - } - - @Override - public void tearDown() throws Exception { - super.tearDown(); - IOUtils.close(clusterService); - ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); - } - - @SuppressWarnings("unchecked") - public void testOnRequestEnd() throws InterruptedException { - Long timestamp = System.currentTimeMillis() - 100L; - SearchType searchType = SearchType.QUERY_THEN_FETCH; - - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword")); - searchSourceBuilder.size(0); - SearchTask task = new SearchTask( - 0, - "n/a", - "n/a", - () -> "test", - TaskId.EMPTY_TASK_ID, - Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel") - ); - - String[] indices = new String[] { "index-1", "index-2" }; - - Map phaseLatencyMap = new HashMap<>(); - phaseLatencyMap.put("expand", 0L); - phaseLatencyMap.put("query", 20L); - phaseLatencyMap.put("fetch", 1L); - - int numberOfShards = 10; - - QueryInsightsListener queryInsightsListener = new QueryInsightsListener(clusterService, queryInsightsService); - - when(searchRequest.getOrCreateAbsoluteStartMillis()).thenReturn(timestamp); - when(searchRequest.searchType()).thenReturn(searchType); - when(searchRequest.source()).thenReturn(searchSourceBuilder); - when(searchRequest.indices()).thenReturn(indices); - when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap); - when(searchPhaseContext.getRequest()).thenReturn(searchRequest); - when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards); - when(searchPhaseContext.getTask()).thenReturn(task); - ArgumentCaptor captor = ArgumentCaptor.forClass(SearchQueryRecord.class); - - queryInsightsListener.onRequestEnd(searchPhaseContext, searchRequestContext); - - verify(queryInsightsService, times(1)).addRecord(captor.capture()); - SearchQueryRecord generatedRecord = captor.getValue(); - assertEquals(timestamp.longValue(), generatedRecord.getTimestamp()); - assertEquals(numberOfShards, generatedRecord.getAttributes().get(Attribute.TOTAL_SHARDS)); - assertEquals(searchType.toString().toLowerCase(Locale.ROOT), generatedRecord.getAttributes().get(Attribute.SEARCH_TYPE)); - assertEquals(searchSourceBuilder.toString(), generatedRecord.getAttributes().get(Attribute.SOURCE)); - Map labels = (Map) generatedRecord.getAttributes().get(Attribute.LABELS); - assertEquals("userLabel", labels.get(Task.X_OPAQUE_ID)); - } - - public void testConcurrentOnRequestEnd() throws InterruptedException { - Long timestamp = System.currentTimeMillis() - 100L; - SearchType searchType = SearchType.QUERY_THEN_FETCH; - - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword")); - searchSourceBuilder.size(0); - SearchTask task = new SearchTask( - 0, - "n/a", - "n/a", - () -> "test", - TaskId.EMPTY_TASK_ID, - Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel") - ); - - String[] indices = new String[] { "index-1", "index-2" }; - - Map phaseLatencyMap = new HashMap<>(); - phaseLatencyMap.put("expand", 0L); - phaseLatencyMap.put("query", 20L); - phaseLatencyMap.put("fetch", 1L); - - int numberOfShards = 10; - - final List searchListenersList = new ArrayList<>(); - - when(searchRequest.getOrCreateAbsoluteStartMillis()).thenReturn(timestamp); - when(searchRequest.searchType()).thenReturn(searchType); - when(searchRequest.source()).thenReturn(searchSourceBuilder); - when(searchRequest.indices()).thenReturn(indices); - when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap); - when(searchPhaseContext.getRequest()).thenReturn(searchRequest); - when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards); - when(searchPhaseContext.getTask()).thenReturn(task); - - int numRequests = 50; - Thread[] threads = new Thread[numRequests]; - Phaser phaser = new Phaser(numRequests + 1); - CountDownLatch countDownLatch = new CountDownLatch(numRequests); - - for (int i = 0; i < numRequests; i++) { - searchListenersList.add(new QueryInsightsListener(clusterService, queryInsightsService)); - } - - for (int i = 0; i < numRequests; i++) { - int finalI = i; - threads[i] = new Thread(() -> { - phaser.arriveAndAwaitAdvance(); - QueryInsightsListener thisListener = searchListenersList.get(finalI); - thisListener.onRequestEnd(searchPhaseContext, searchRequestContext); - countDownLatch.countDown(); - }); - threads[i].start(); - } - phaser.arriveAndAwaitAdvance(); - countDownLatch.await(); - - verify(queryInsightsService, times(numRequests)).addRecord(any()); - } - - public void testSetEnabled() { - when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); - QueryInsightsListener queryInsightsListener = new QueryInsightsListener(clusterService, queryInsightsService); - queryInsightsListener.setEnableTopQueries(MetricType.LATENCY, true); - assertTrue(queryInsightsListener.isEnabled()); - - when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(false); - when(queryInsightsService.isCollectionEnabled(MetricType.CPU)).thenReturn(false); - when(queryInsightsService.isCollectionEnabled(MetricType.MEMORY)).thenReturn(false); - queryInsightsListener.setEnableTopQueries(MetricType.LATENCY, false); - assertFalse(queryInsightsListener.isEnabled()); - } -} From 12c984428756f8d1be8282cae4d80c4bdde9b707 Mon Sep 17 00:00:00 2001 From: Siddhant Deshmukh Date: Thu, 18 Jul 2024 13:14:34 -0700 Subject: [PATCH 09/15] Address review comments Signed-off-by: Siddhant Deshmukh --- ...estOperationsCompositeListenerFactory.java | 5 +++-- .../action/search/TransportSearchAction.java | 4 ++-- .../SearchTaskResourceOperationsListener.java | 19 +++---------------- 3 files changed, 8 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactory.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactory.java index 842a0e2dc61fe..f9d7f5c6b6b2f 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactory.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactory.java @@ -69,8 +69,9 @@ public SearchRequestOperationsListener.CompositeListener buildCompositeListener( final SearchRequestOperationsListener... perRequestListeners ) { final List searchListenersList = Stream.concat( - Arrays.stream(perRequestListeners), - searchRequestListenersList.stream() + searchRequestListenersList.stream(), + Arrays.stream(perRequestListeners) + ) .filter((searchRequestOperationsListener -> searchRequestOperationsListener.isEnabled(searchRequest))) .collect(Collectors.toList()); diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index c250b3893f199..d5cfcce0fd793 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -86,7 +86,7 @@ import org.opensearch.search.profile.ProfileShardResult; import org.opensearch.search.profile.SearchProfileShardResults; import org.opensearch.tasks.CancellableTask; -import org.opensearch.tasks.SearchTaskResourceOperationsListener; +import org.opensearch.tasks.SearchTaskRequestOperationsListener; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.telemetry.metrics.MetricsRegistry; @@ -435,7 +435,7 @@ private void executeRequest( originalSearchRequest, logger, TraceableSearchRequestOperationsListener.create(tracer, requestSpan), - new SearchTaskResourceOperationsListener(taskResourceTrackingService) + new SearchTaskRequestOperationsListener(taskResourceTrackingService) ); SearchRequestContext searchRequestContext = new SearchRequestContext( requestOperationsListeners, diff --git a/server/src/main/java/org/opensearch/tasks/SearchTaskResourceOperationsListener.java b/server/src/main/java/org/opensearch/tasks/SearchTaskResourceOperationsListener.java index e7f752c7af183..7c336fb1c7210 100644 --- a/server/src/main/java/org/opensearch/tasks/SearchTaskResourceOperationsListener.java +++ b/server/src/main/java/org/opensearch/tasks/SearchTaskResourceOperationsListener.java @@ -13,31 +13,18 @@ import org.opensearch.action.search.SearchRequestOperationsListener; /** - * SearchTaskResourceOperationsListener subscriber for operations on search tasks resource usages. + * SearchTaskRequestOperationsListener subscriber for operations on search tasks resource usages. * Listener ensures to refreshResourceStats on request end capturing the search task resource usage * upon request completion. * - * @PublicApi */ -public final class SearchTaskResourceOperationsListener extends SearchRequestOperationsListener { +public final class SearchTaskRequestOperationsListener extends SearchRequestOperationsListener { private final TaskResourceTrackingService taskResourceTrackingService; - public SearchTaskResourceOperationsListener(TaskResourceTrackingService taskResourceTrackingService) { + public SearchTaskRequestOperationsListener(TaskResourceTrackingService taskResourceTrackingService) { this.taskResourceTrackingService = taskResourceTrackingService; } - @Override - protected void onPhaseStart(SearchPhaseContext context) {} - - @Override - protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} - - @Override - protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {} - - @Override - public void onRequestStart(SearchRequestContext searchRequestContext) {} - @Override public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { taskResourceTrackingService.refreshResourceStats(context.getTask()); From 6c9dbf6ff0c24810024d00be7d1a53a6daa669a5 Mon Sep 17 00:00:00 2001 From: Siddhant Deshmukh Date: Thu, 18 Jul 2024 13:45:57 -0700 Subject: [PATCH 10/15] Build fix Signed-off-by: Siddhant Deshmukh --- .../search/SearchRequestOperationsCompositeListenerFactory.java | 1 - ...onsListener.java => SearchTaskRequestOperationsListener.java} | 0 2 files changed, 1 deletion(-) rename server/src/main/java/org/opensearch/tasks/{SearchTaskResourceOperationsListener.java => SearchTaskRequestOperationsListener.java} (100%) diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactory.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactory.java index f9d7f5c6b6b2f..db487bf945889 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactory.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactory.java @@ -71,7 +71,6 @@ public SearchRequestOperationsListener.CompositeListener buildCompositeListener( final List searchListenersList = Stream.concat( searchRequestListenersList.stream(), Arrays.stream(perRequestListeners) - ) .filter((searchRequestOperationsListener -> searchRequestOperationsListener.isEnabled(searchRequest))) .collect(Collectors.toList()); diff --git a/server/src/main/java/org/opensearch/tasks/SearchTaskResourceOperationsListener.java b/server/src/main/java/org/opensearch/tasks/SearchTaskRequestOperationsListener.java similarity index 100% rename from server/src/main/java/org/opensearch/tasks/SearchTaskResourceOperationsListener.java rename to server/src/main/java/org/opensearch/tasks/SearchTaskRequestOperationsListener.java From 6a92627776eb67b97ad76cc9b2750d5ed8080e42 Mon Sep 17 00:00:00 2001 From: Siddhant Deshmukh Date: Thu, 18 Jul 2024 14:45:44 -0700 Subject: [PATCH 11/15] Make singleton Signed-off-by: Siddhant Deshmukh --- .../action/search/TransportSearchAction.java | 2 +- .../tasks/SearchTaskRequestOperationsListener.java | 10 +++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index d5cfcce0fd793..ceab2343091f7 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -435,7 +435,7 @@ private void executeRequest( originalSearchRequest, logger, TraceableSearchRequestOperationsListener.create(tracer, requestSpan), - new SearchTaskRequestOperationsListener(taskResourceTrackingService) + SearchTaskRequestOperationsListener.getInstance(taskResourceTrackingService) ); SearchRequestContext searchRequestContext = new SearchRequestContext( requestOperationsListeners, diff --git a/server/src/main/java/org/opensearch/tasks/SearchTaskRequestOperationsListener.java b/server/src/main/java/org/opensearch/tasks/SearchTaskRequestOperationsListener.java index 7c336fb1c7210..de46ed6a28581 100644 --- a/server/src/main/java/org/opensearch/tasks/SearchTaskRequestOperationsListener.java +++ b/server/src/main/java/org/opensearch/tasks/SearchTaskRequestOperationsListener.java @@ -19,12 +19,20 @@ * */ public final class SearchTaskRequestOperationsListener extends SearchRequestOperationsListener { + private static SearchTaskRequestOperationsListener instance; private final TaskResourceTrackingService taskResourceTrackingService; - public SearchTaskRequestOperationsListener(TaskResourceTrackingService taskResourceTrackingService) { + private SearchTaskRequestOperationsListener(TaskResourceTrackingService taskResourceTrackingService) { this.taskResourceTrackingService = taskResourceTrackingService; } + public static synchronized SearchTaskRequestOperationsListener getInstance(TaskResourceTrackingService taskResourceTrackingService) { + if (instance == null) { + instance = new SearchTaskRequestOperationsListener(taskResourceTrackingService); + } + return instance; + } + @Override public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { taskResourceTrackingService.refreshResourceStats(context.getTask()); From 7ca5cfeaace237fc1327cdfaaca3555892f6dc5c Mon Sep 17 00:00:00 2001 From: Siddhant Deshmukh Date: Thu, 18 Jul 2024 16:29:52 -0700 Subject: [PATCH 12/15] Address review comments Signed-off-by: Siddhant Deshmukh --- CHANGELOG.md | 1 + .../SearchTaskRequestOperationsListener.java | 18 ++++-------------- .../action/search/TransportSearchAction.java | 5 +++-- 3 files changed, 8 insertions(+), 16 deletions(-) rename server/src/main/java/org/opensearch/{tasks => action/search}/SearchTaskRequestOperationsListener.java (50%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 326f00156a3cc..991cabd097683 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Enabling term version check on local state for all ClusterManager Read Transport Actions ([#14273](https://github.com/opensearch-project/OpenSearch/pull/14273)) - Add persian_stem filter (([#14847](https://github.com/opensearch-project/OpenSearch/pull/14847))) - Create public listener to refresh search thread resource usage ([#14832](https://github.com/opensearch-project/OpenSearch/pull/14832)) +- Create listener to refresh search thread resource usage ([#14832](https://github.com/opensearch-project/OpenSearch/pull/14832)) ### Dependencies - Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442)) diff --git a/server/src/main/java/org/opensearch/tasks/SearchTaskRequestOperationsListener.java b/server/src/main/java/org/opensearch/action/search/SearchTaskRequestOperationsListener.java similarity index 50% rename from server/src/main/java/org/opensearch/tasks/SearchTaskRequestOperationsListener.java rename to server/src/main/java/org/opensearch/action/search/SearchTaskRequestOperationsListener.java index de46ed6a28581..1a8ca2c5c24df 100644 --- a/server/src/main/java/org/opensearch/tasks/SearchTaskRequestOperationsListener.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTaskRequestOperationsListener.java @@ -6,11 +6,9 @@ * compatible open source license. */ -package org.opensearch.tasks; +package org.opensearch.action.search; -import org.opensearch.action.search.SearchPhaseContext; -import org.opensearch.action.search.SearchRequestContext; -import org.opensearch.action.search.SearchRequestOperationsListener; +import org.opensearch.tasks.TaskResourceTrackingService; /** * SearchTaskRequestOperationsListener subscriber for operations on search tasks resource usages. @@ -18,21 +16,13 @@ * upon request completion. * */ -public final class SearchTaskRequestOperationsListener extends SearchRequestOperationsListener { - private static SearchTaskRequestOperationsListener instance; +final class SearchTaskRequestOperationsListener extends SearchRequestOperationsListener { private final TaskResourceTrackingService taskResourceTrackingService; - private SearchTaskRequestOperationsListener(TaskResourceTrackingService taskResourceTrackingService) { + SearchTaskRequestOperationsListener(TaskResourceTrackingService taskResourceTrackingService) { this.taskResourceTrackingService = taskResourceTrackingService; } - public static synchronized SearchTaskRequestOperationsListener getInstance(TaskResourceTrackingService taskResourceTrackingService) { - if (instance == null) { - instance = new SearchTaskRequestOperationsListener(taskResourceTrackingService); - } - return instance; - } - @Override public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { taskResourceTrackingService.refreshResourceStats(context.getTask()); diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index ceab2343091f7..d97d50fc3b7fe 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -86,7 +86,6 @@ import org.opensearch.search.profile.ProfileShardResult; import org.opensearch.search.profile.SearchProfileShardResults; import org.opensearch.tasks.CancellableTask; -import org.opensearch.tasks.SearchTaskRequestOperationsListener; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.telemetry.metrics.MetricsRegistry; @@ -175,6 +174,7 @@ public class TransportSearchAction extends HandledTransportAction buildPerIndexAliasFilter( @@ -435,7 +436,7 @@ private void executeRequest( originalSearchRequest, logger, TraceableSearchRequestOperationsListener.create(tracer, requestSpan), - SearchTaskRequestOperationsListener.getInstance(taskResourceTrackingService) + searchTaskRequestOperationsListener ); SearchRequestContext searchRequestContext = new SearchRequestContext( requestOperationsListeners, From 0e8d668669c088bf8ee8e9f4cbc72b1d39fd2629 Mon Sep 17 00:00:00 2001 From: Siddhant Deshmukh Date: Mon, 22 Jul 2024 18:39:36 -0700 Subject: [PATCH 13/15] Make sure listener runs before plugin listeners Signed-off-by: Siddhant Deshmukh --- .../SearchTaskRequestOperationsListener.java | 4 ++-- .../action/search/TransportSearchAction.java | 5 +---- .../src/main/java/org/opensearch/node/Node.java | 16 +++++++++------- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/search/SearchTaskRequestOperationsListener.java b/server/src/main/java/org/opensearch/action/search/SearchTaskRequestOperationsListener.java index 1a8ca2c5c24df..4434d71793b23 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTaskRequestOperationsListener.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTaskRequestOperationsListener.java @@ -16,10 +16,10 @@ * upon request completion. * */ -final class SearchTaskRequestOperationsListener extends SearchRequestOperationsListener { +public final class SearchTaskRequestOperationsListener extends SearchRequestOperationsListener { private final TaskResourceTrackingService taskResourceTrackingService; - SearchTaskRequestOperationsListener(TaskResourceTrackingService taskResourceTrackingService) { + public SearchTaskRequestOperationsListener(TaskResourceTrackingService taskResourceTrackingService) { this.taskResourceTrackingService = taskResourceTrackingService; } diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index d97d50fc3b7fe..7d3237d43cd5c 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -174,7 +174,6 @@ public class TransportSearchAction extends HandledTransportAction buildPerIndexAliasFilter( @@ -435,8 +433,7 @@ private void executeRequest( requestOperationsListeners = searchRequestOperationsCompositeListenerFactory.buildCompositeListener( originalSearchRequest, logger, - TraceableSearchRequestOperationsListener.create(tracer, requestSpan), - searchTaskRequestOperationsListener + TraceableSearchRequestOperationsListener.create(tracer, requestSpan) ); SearchRequestContext searchRequestContext = new SearchRequestContext( requestOperationsListeners, diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index d91b2a45a48c6..04faae00a6638 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -52,6 +52,7 @@ import org.opensearch.action.search.SearchRequestOperationsListener; import org.opensearch.action.search.SearchRequestSlowLog; import org.opensearch.action.search.SearchRequestStats; +import org.opensearch.action.search.SearchTaskRequestOperationsListener; import org.opensearch.action.search.SearchTransportService; import org.opensearch.action.support.TransportAction; import org.opensearch.action.update.UpdateHelper; @@ -855,8 +856,15 @@ protected Node( threadPool ); + final TaskResourceTrackingService taskResourceTrackingService = new TaskResourceTrackingService( + settings, + clusterService.getClusterSettings(), + threadPool + ); + final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService.getClusterSettings()); final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); + final SearchTaskRequestOperationsListener searchTaskRequestOperationsListener = new SearchTaskRequestOperationsListener(taskResourceTrackingService); remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings); CacheModule cacheModule = new CacheModule(pluginsService.filterPlugins(CachePlugin.class), settings); @@ -988,7 +996,7 @@ protected Node( final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory = new SearchRequestOperationsCompositeListenerFactory( Stream.concat( - Stream.of(searchRequestStats, searchRequestSlowLog), + Stream.of(searchRequestStats, searchRequestSlowLog, searchTaskRequestOperationsListener), pluginComponents.stream() .filter(p -> p instanceof SearchRequestOperationsListener) .map(p -> (SearchRequestOperationsListener) p) @@ -1117,12 +1125,6 @@ protected Node( // development. Then we can deprecate Getter and Setter for IndexingPressureService in ClusterService (#478). clusterService.setIndexingPressureService(indexingPressureService); - final TaskResourceTrackingService taskResourceTrackingService = new TaskResourceTrackingService( - settings, - clusterService.getClusterSettings(), - threadPool - ); - final SearchBackpressureSettings searchBackpressureSettings = new SearchBackpressureSettings( settings, clusterService.getClusterSettings() From fa26a0c1f4bc2badf6d12da60b97254004848d4c Mon Sep 17 00:00:00 2001 From: Siddhant Deshmukh Date: Mon, 22 Jul 2024 19:00:20 -0700 Subject: [PATCH 14/15] Spotless Signed-off-by: Siddhant Deshmukh --- server/src/main/java/org/opensearch/node/Node.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 04faae00a6638..448cb3627651c 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -864,7 +864,9 @@ protected Node( final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService.getClusterSettings()); final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); - final SearchTaskRequestOperationsListener searchTaskRequestOperationsListener = new SearchTaskRequestOperationsListener(taskResourceTrackingService); + final SearchTaskRequestOperationsListener searchTaskRequestOperationsListener = new SearchTaskRequestOperationsListener( + taskResourceTrackingService + ); remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings); CacheModule cacheModule = new CacheModule(pluginsService.filterPlugins(CachePlugin.class), settings); From b056d0530b174ddf494c67459a7f8b8b7eb5ec57 Mon Sep 17 00:00:00 2001 From: Siddhant Deshmukh Date: Mon, 22 Jul 2024 19:02:58 -0700 Subject: [PATCH 15/15] Minor fix Signed-off-by: Siddhant Deshmukh --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 991cabd097683..9d32433992d8b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,7 +27,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Reduce logging in DEBUG for MasterService:run ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795)) - Enabling term version check on local state for all ClusterManager Read Transport Actions ([#14273](https://github.com/opensearch-project/OpenSearch/pull/14273)) - Add persian_stem filter (([#14847](https://github.com/opensearch-project/OpenSearch/pull/14847))) -- Create public listener to refresh search thread resource usage ([#14832](https://github.com/opensearch-project/OpenSearch/pull/14832)) - Create listener to refresh search thread resource usage ([#14832](https://github.com/opensearch-project/OpenSearch/pull/14832)) ### Dependencies