Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add number shards by node #246

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@

package org.compuscene.metrics.prometheus;

import java.util.logging.LogManager;
import java.util.logging.Logger;
import org.opensearch.action.ClusterStatsData;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.action.admin.indices.stats.IndexStats;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
Expand Down Expand Up @@ -91,6 +98,7 @@ public void registerMetrics() {
registerOsMetrics();
registerFsMetrics();
registerESSettings();
registerNumberOfShardsPerNode();
}

private void registerClusterMetrics() {
Expand Down Expand Up @@ -925,7 +933,8 @@ public void updateMetrics(String originNodeName, String originNodeId,
@Nullable ClusterHealthResponse clusterHealthResponse,
NodeStats[] nodeStats,
@Nullable IndicesStatsResponse indicesStats,
@Nullable ClusterStatsData clusterStatsData) {
@Nullable ClusterStatsData clusterStatsData,
@Nullable ClusterStateResponse clusterStateResponse) {
Summary.Timer timer = catalog.startSummaryTimer(
new Tuple<>(originNodeName, originNodeId),
"metrics_generate_time_seconds");
Expand All @@ -936,6 +945,7 @@ public void updateMetrics(String originNodeName, String originNodeId,
String nodeName = s.getNode().getName();
String nodeID = s.getNode().getId();
Tuple<String, String> nodeInfo = new Tuple<>(nodeName, nodeID);
DiscoveryNode node = s.getNode();

updateNodeMetrics(nodeInfo, s);
updateIndicesMetrics(nodeInfo, s.getIndices());
Expand All @@ -949,6 +959,7 @@ public void updateMetrics(String originNodeName, String originNodeId,
updateJVMMetrics(nodeInfo, s.getJvm());
updateOsMetrics(nodeInfo, s.getOs());
updateFsMetrics(nodeInfo, s.getFs());
updateNumberOfShardsPerNode(nodeInfo,clusterStateResponse,node);
}
if (isPrometheusIndices) {
updatePerIndexMetrics(clusterHealthResponse, indicesStats);
Expand All @@ -960,6 +971,33 @@ public void updateMetrics(String originNodeName, String originNodeId,
timer.observeDuration();
}

private void registerNumberOfShardsPerNode() {
catalog.registerNodeGauge("nodes_shards_number", "node shards");
}
public void updateNumberOfShardsPerNode(Tuple<String, String> nodeInfo,ClusterStateResponse clusterStateResponse,DiscoveryNode node) {

final Map<String, Integer> allocs = new HashMap<>();
if(clusterStateResponse.getState().routingTable().allShards()!=null){
for (ShardRouting shard : clusterStateResponse.getState().routingTable().allShards()) {
String nodeId = "UNASSIGNED";

if (shard.assignedToNode()) {
nodeId = shard.currentNodeId();
}

allocs.merge(nodeId, 1, Integer::sum);
}

int shardCount = allocs.getOrDefault(node.getId(), 0);

catalog.setNodeGauge(nodeInfo,"nodes_shards_number", shardCount);

}

}



/**
* Get the metric catalog.
* @return The catalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
import org.compuscene.metrics.prometheus.PrometheusSettings;
import org.opensearch.action.NodePrometheusMetricsRequest;
import org.opensearch.action.NodePrometheusMetricsResponse;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.core.action.ActionListener;
import org.opensearch.OpenSearchException;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.network.NetworkAddress;
import org.opensearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -65,7 +69,7 @@ public class RestPrometheusMetricsAction extends BaseRestHandler {
private final String metricPrefix;
private final PrometheusSettings prometheusSettings;
private final Logger logger = LogManager.getLogger(getClass());

private ClusterStateResponse clusterStateResponse = null;
/**
* A constructor.
* @param settings Settings
Expand All @@ -91,6 +95,18 @@ public String getName() {
return "prometheus_metrics_action";
}

private final ActionListener<ClusterStateResponse> clusterStateResponseActionListener =
new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(ClusterStateResponse response) {
clusterStateResponse = response;
}
@Override
public void onFailure(Exception e) {

}
};

// This method does not throw any IOException because there are no request parameters to be parsed
// and processed. This may change in the future.
@Override
Expand All @@ -113,7 +129,8 @@ public RestResponse buildResponse(NodePrometheusMetricsResponse response) throws
assert response.getLocalNodesInfoResponse().getNodes().size() == 1;
String nodeName = response.getLocalNodesInfoResponse().getNodes().get(0).getNode().getName();
String nodeId = response.getLocalNodesInfoResponse().getNodes().get(0).getNode().getId();

ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
client.admin().cluster().state(clusterStateRequest, clusterStateResponseActionListener);
if (logger.isTraceEnabled()) {
logger.trace("Preparing metrics output on node: [{}], [{}]", nodeName, nodeId);
}
Expand All @@ -130,7 +147,7 @@ public RestResponse buildResponse(NodePrometheusMetricsResponse response) throws
collector.registerMetrics();
collector.updateMetrics(
nodeName, nodeId, response.getClusterHealth(), response.getNodeStats(),
response.getIndicesStats(), response.getClusterStatsData());
response.getIndicesStats(), response.getClusterStatsData(),clusterStateResponse);
textContent = collector.getTextContent();
} catch (Exception ex) {
// We use try-catch block to catch exception from Prometheus catalog and collector processing
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"Cluster nodes shards (nodes_shards_number)":

# We expect no indices in the cluster
- do:
indices.refresh: { allow_no_indices: true }

- do:
cluster.stats: {}

- match: { indices.count: 0 }

- do:
index:
index: twitter
id: 1
body: { foo: bar, settings: { number_of_shards: 5 } }

- do:
indices.refresh: { allow_no_indices: true }

# Verify in Prometheus metrics that we get metrics only from a single node (the _local one):
- do:
prometheus.metrics: {}

- match:
$body: |
/.*
opensearch_nodes_shards_number\{
cluster="yamlRestTest",node="[a-zA-Z0-9\-\.\_]+",nodeid="[a-zA-Z0-9\-\.\_]+"
\,} \s \d+\.\d+ (\n\#|![\n])
.*/
Loading