Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions presto-docs/src/main/sphinx/admin/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,15 @@ Maximum object size in bytes that can be considered serializable in a function c

The corresponding session property is :ref:`admin/properties-session:\`\`max_serializable_object_size\`\``.

``cluster-tag``
^^^^^^^^^^^^^^^

* **Type:** ``string``
* **Default value:** (none)

An optional identifier for the cluster. When set, this tag is included in the response from the
``/v1/cluster`` REST API endpoint, allowing clients to identify which cluster provided the response.

Memory Management Properties
----------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class ServerConfig
private Duration clusterStatsExpirationDuration = new Duration(0, MILLISECONDS);
private boolean nestedDataSerializationEnabled = true;
private Duration clusterResourceGroupStateInfoExpirationDuration = new Duration(0, MILLISECONDS);
private String clusterTag;

public boolean isResourceManager()
{
Expand Down Expand Up @@ -240,4 +241,16 @@ public ServerConfig setClusterResourceGroupStateInfoExpirationDuration(Duration
this.clusterResourceGroupStateInfoExpirationDuration = clusterResourceGroupStateInfoExpirationDuration;
return this;
}

public String getClusterTag()
{
return clusterTag;
}

@Config("cluster-tag")
public ServerConfig setClusterTag(String clusterTag)
{
this.clusterTag = clusterTag;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class DistributedClusterStatsResource
private final ResourceManagerClusterStateProvider clusterStateProvider;
private final InternalNodeManager internalNodeManager;
private final Supplier<ClusterStats> clusterStatsSupplier;
private final String clusterTag;

@Inject
public DistributedClusterStatsResource(
Expand All @@ -61,7 +62,9 @@ public DistributedClusterStatsResource(
this.isIncludeCoordinator = requireNonNull(nodeSchedulerConfig, "nodeSchedulerConfig is null").isIncludeCoordinator();
this.clusterStateProvider = requireNonNull(clusterStateProvider, "nodeStateManager is null");
this.internalNodeManager = requireNonNull(internalNodeManager, "internalNodeManager is null");
Duration expirationDuration = requireNonNull(serverConfig, "serverConfig is null").getClusterStatsExpirationDuration();
ServerConfig config = requireNonNull(serverConfig, "serverConfig is null");
this.clusterTag = config.getClusterTag();
Duration expirationDuration = config.getClusterStatsExpirationDuration();
this.clusterStatsSupplier = expirationDuration.getValue() > 0 ? memoizeWithExpiration(this::calculateClusterStats, expirationDuration.toMillis(), MILLISECONDS) : this::calculateClusterStats;
}

Expand Down Expand Up @@ -126,7 +129,8 @@ else if (query.getState() == QueryState.RUNNING) {
totalInputRows,
totalInputBytes,
totalCpuTimeSecs,
clusterStateProvider.getAdjustedQueueSize());
clusterStateProvider.getAdjustedQueueSize(),
clusterTag);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class ClusterStatsResource
private final InternalResourceGroupManager internalResourceGroupManager;
private final ClusterTtlProviderManager clusterTtlProviderManager;
private final Supplier<ClusterStats> clusterStatsSupplier;
private final String clusterTag;

@Inject
public ClusterStatsResource(
Expand All @@ -96,7 +97,9 @@ public ClusterStatsResource(
this.proxyHelper = requireNonNull(proxyHelper, "internalNodeManager is null");
this.internalResourceGroupManager = requireNonNull(internalResourceGroupManager, "internalResourceGroupManager is null");
this.clusterTtlProviderManager = requireNonNull(clusterTtlProviderManager, "clusterTtlProvider is null");
Duration expirationDuration = requireNonNull(serverConfig, "serverConfig is null").getClusterStatsExpirationDuration();
ServerConfig config = requireNonNull(serverConfig, "serverConfig is null");
this.clusterTag = config.getClusterTag();
Duration expirationDuration = config.getClusterStatsExpirationDuration();
this.clusterStatsSupplier = expirationDuration.getValue() > 0 ? memoizeWithExpiration(this::calculateClusterStats, expirationDuration.toMillis(), MILLISECONDS) : this::calculateClusterStats;
}

Expand Down Expand Up @@ -170,7 +173,8 @@ else if (query.getState() == QueryState.RUNNING) {
totalInputRows,
totalInputBytes,
totalCpuTimeSecs,
internalResourceGroupManager.getQueriesQueuedOnInternal());
internalResourceGroupManager.getQueriesQueuedOnInternal(),
clusterTag);
}

@GET
Expand Down Expand Up @@ -238,6 +242,8 @@ public static class ClusterStats
private final long totalCpuTimeSecs;
private final long adjustedQueueSize;

private final String clusterTag;

@JsonCreator
@ThriftConstructor
public ClusterStats(
Expand All @@ -251,7 +257,8 @@ public ClusterStats(
@JsonProperty("totalInputRows") long totalInputRows,
@JsonProperty("totalInputBytes") long totalInputBytes,
@JsonProperty("totalCpuTimeSecs") long totalCpuTimeSecs,
@JsonProperty("adjustedQueueSize") long adjustedQueueSize)
@JsonProperty("adjustedQueueSize") long adjustedQueueSize,
@JsonProperty("clusterTag") String clusterTag)
{
this.runningQueries = runningQueries;
this.blockedQueries = blockedQueries;
Expand All @@ -264,6 +271,7 @@ public ClusterStats(
this.totalInputBytes = totalInputBytes;
this.totalCpuTimeSecs = totalCpuTimeSecs;
this.adjustedQueueSize = adjustedQueueSize;
this.clusterTag = clusterTag;
}

@JsonProperty
Expand Down Expand Up @@ -342,5 +350,12 @@ public long getAdjustedQueueSize()
{
return adjustedQueueSize;
}

@JsonProperty
@ThriftField(12)
public String getClusterTag()
{
return clusterTag;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ else if (serverConfig.isCoordinator()) {

install(new InternalCommunicationModule());

configBinder(binder).bindConfig(ServerConfig.class);
configBinder(binder).bindConfig(FeaturesConfig.class);
configBinder(binder).bindConfig(FunctionsConfig.class);
configBinder(binder).bindConfig(JavaFeaturesConfig.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class TestThriftClusterStats
public static final long TOTAL_INPUT_BYTES = 1003;
public static final long TOTAL_CPU_TIME_SECS = 1004;
public static final long ADJUSTED_QUEUE_SIZE = 1005;
public static final String CLUSTER_TAG = "test-cluster";
private static final ThriftCodecManager COMPILER_READ_CODEC_MANAGER = new ThriftCodecManager(new CompilerThriftCodecFactory(false));
private static final ThriftCodecManager COMPILER_WRITE_CODEC_MANAGER = new ThriftCodecManager(new CompilerThriftCodecFactory(false));
private static final ThriftCodec<ClusterStats> COMPILER_READ_CODEC = COMPILER_READ_CODEC_MANAGER.getCodec(ClusterStats.class);
Expand Down Expand Up @@ -111,6 +112,7 @@ private void assertSerde(ClusterStats clusterStats)
assertEquals(clusterStats.getTotalInputBytes(), TOTAL_INPUT_BYTES);
assertEquals(clusterStats.getTotalCpuTimeSecs(), TOTAL_CPU_TIME_SECS);
assertEquals(clusterStats.getAdjustedQueueSize(), ADJUSTED_QUEUE_SIZE);
assertEquals(clusterStats.getClusterTag(), CLUSTER_TAG);
}

private ClusterStats getRoundTripSerialize(ThriftCodec<ClusterStats> readCodec, ThriftCodec<ClusterStats> writeCodec, Function<TTransport, TProtocol> protocolFactory)
Expand All @@ -134,6 +136,7 @@ private ClusterStats getClusterStats()
TOTAL_INPUT_ROWS,
TOTAL_INPUT_BYTES,
TOTAL_CPU_TIME_SECS,
ADJUSTED_QUEUE_SIZE);
ADJUSTED_QUEUE_SIZE,
CLUSTER_TAG);
}
}
Loading
Loading