Skip to content

Commit

Permalink
Make some constant SubscribableListener instances cheaper (#124452)
Browse files Browse the repository at this point in the history
We can just use a real constant for the `null` case, avoiding any
non-plain stores in all cases. This should be somewhat helpful for the
security interceptors.
  • Loading branch information
original-brownbear authored Mar 9, 2025
1 parent 425823c commit 29ac261
Show file tree
Hide file tree
Showing 16 changed files with 31 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,15 @@ public TransportClusterStatsAction(
protected SubscribableListener<AdditionalStats> createActionContext(Task task, ClusterStatsRequest request) {
assert task instanceof CancellableTask;
final var cancellableTask = (CancellableTask) task;
final var additionalStatsListener = new SubscribableListener<AdditionalStats>();
if (request.isRemoteStats() == false) {
final var additionalStatsListener = new SubscribableListener<AdditionalStats>();
final AdditionalStats additionalStats = new AdditionalStats();
additionalStats.compute(cancellableTask, request, additionalStatsListener);
return additionalStatsListener;
} else {
// For remote stats request, we don't need to compute anything
additionalStatsListener.onResponse(null);
return SubscribableListener.nullSuccess();
}
return additionalStatsListener;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,4 +586,15 @@ private Runnable scheduleTimeout(TimeValue timeout, ThreadPool threadPool, Execu
private Object compareAndExchangeState(Object expectedValue, Object newValue) {
return VH_STATE_FIELD.compareAndExchange(this, expectedValue, newValue);
}

@SuppressWarnings("rawtypes")
private static final SubscribableListener NULL_SUCCESS = newSucceeded(null);

/**
* Same as {@link #newSucceeded(Object)} but always returns the same instance with result value {@code null}.
*/
@SuppressWarnings("unchecked")
public static <T> SubscribableListener<T> nullSuccess() {
return NULL_SUCCESS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ protected void doClose() {}
* Kind of a hack tbh, we can't be sure the shard locks are fully released when this is completed so there's all sorts of retries and
* other lenience to handle that. It'd be better to wait for the shard locks to be released and then delete the data. See #74149.
*/
private volatile SubscribableListener<Void> lastClusterStateShardsClosedListener = SubscribableListener.newSucceeded(null);
private volatile SubscribableListener<Void> lastClusterStateShardsClosedListener = SubscribableListener.nullSuccess();

@Nullable // if not currently applying a cluster state
private RefCountingListener currentClusterStateShardsClosedListeners;
Expand Down Expand Up @@ -397,7 +397,7 @@ private void deleteIndices(final ClusterChangedEvent event) {
);
} else if (project.isPresent() && project.get().hasIndex(index)) {
// The deleted index was part of the previous cluster state, but not loaded on the local node
indexServiceClosedListener = SubscribableListener.newSucceeded(null);
indexServiceClosedListener = SubscribableListener.nullSuccess();
final IndexMetadata metadata = project.get().index(index);
indexSettings = new IndexSettings(metadata, settings);
indicesService.deleteUnassignedIndex("deleted index was not assigned to local node", metadata, state);
Expand All @@ -411,7 +411,7 @@ private void deleteIndices(final ClusterChangedEvent event) {
// previous cluster state is not initialized/recovered.
assert state.metadata().projects().values().stream().anyMatch(p -> p.indexGraveyard().containsIndex(index))
|| previousState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK);
indexServiceClosedListener = SubscribableListener.newSucceeded(null);
indexServiceClosedListener = SubscribableListener.nullSuccess();
final IndexMetadata metadata = indicesService.verifyIndexIsDeleted(index, event.state());
if (metadata != null) {
indexSettings = new IndexSettings(metadata, settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public SubscribableListener<Void> getClusterStateDelayListener(long clusterState
refCounted.decRef();
}
} else {
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ private void deleteTemplates(Set<String> excludeTemplates, ActionListener<Void>
SubscribableListener

// dummy start step for symmetry
.newSucceeded(null)

.nullSuccess()
// delete composable templates
.<GetComposableIndexTemplateAction.Response>andThen(getComposableTemplates::addListener)
.<AcknowledgedResponse>andThen((l, r) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ default IsBlockedResult isBlocked() {
return NOT_BLOCKED;
}

IsBlockedResult NOT_BLOCKED = new IsBlockedResult(SubscribableListener.newSucceeded(null), "not blocked");
IsBlockedResult NOT_BLOCKED = new IsBlockedResult(SubscribableListener.nullSuccess(), "not blocked");

/**
* A factory for creating intermediate operators.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ public void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeRe
}
doFetchPageAsync(false, ActionListener.wrap(r -> {
if (r.finished()) {
completionListenerRef.compareAndSet(null, SubscribableListener.newSucceeded(null));
completionListenerRef.compareAndSet(null, SubscribableListener.nullSuccess());
}
listener.onResponse(r);
}, e -> close(ActionListener.running(() -> listener.onFailure(e)))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,6 @@ public SubscribableListener<Void> intercept(
}
}
}
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,6 @@ public SubscribableListener<Void> intercept(
}
}
}
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ && supports(indicesRequest)
return listener;
}
}
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}

abstract void disableFeatures(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public SubscribableListener<Void> intercept(
);
return listener;
} else {
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public SubscribableListener<Void> intercept(
);
return listener;
} else {
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ && hasRemoteIndices(searchRequest)
searchRequest.requestCache(false);
}
}
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}

// package private for test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public void testTaskStatus() throws IOException {
assertEquals(0L, status.indexSnapshotsVerified());
assertEquals(0L, status.blobsVerified());
assertEquals(0L, status.blobBytesVerified());
yield SubscribableListener.newSucceeded(null);
yield SubscribableListener.nullSuccess();
}
case INDEX_RESTORABILITY -> {
// several of these chunks might arrive concurrently; we want to verify the task status before processing any of
Expand All @@ -210,7 +210,7 @@ public void testTaskStatus() throws IOException {
assertEquals(0L, status.indicesVerified());
});
}
case SNAPSHOT_INFO -> SubscribableListener.newSucceeded(null);
case SNAPSHOT_INFO -> SubscribableListener.nullSuccess();
case ANOMALY -> fail(null, "should not see anomalies");
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ protected InputStream openSlice(int slice) throws IOException {
})));
} else {
blobBytesVerified.addAndGet(fileInfo.length());
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private void unassignTransforms(ClusterState state, ActionListener<Void> listene

// chain each call one at a time
// because that is what we are doing for ML, and that is all that is supported in the persistentTasksClusterService (for now)
SubscribableListener<PersistentTasksCustomMetadata.PersistentTask<?>> chainListener = SubscribableListener.newSucceeded(null);
SubscribableListener<PersistentTasksCustomMetadata.PersistentTask<?>> chainListener = SubscribableListener.nullSuccess();
for (var task : transformTasks) {
@FixForMultiProject
final var projectId = Metadata.DEFAULT_PROJECT_ID;
Expand Down

0 comments on commit 29ac261

Please sign in to comment.