Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ void testPerSplitWatermark(boolean emitRecordBeforeSplitAddition) throws Excepti
sourceOperator.handleOperatorEvent(addSplitsEvent);

// First 3 records from split A should not generate any watermarks
CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
() -> {
try {
sourceOperator.emitNext(output);
Expand All @@ -298,7 +298,7 @@ void testPerSplitWatermark(boolean emitRecordBeforeSplitAddition) throws Excepti
"%d out of 3 records are received within timeout", output.numRecords));
assertThat(output.watermarks).isEmpty();

CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
() -> {
try {
sourceOperator.emitNext(output);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public int runUntilRecordsEmitted(
throws Exception {
final AtomicReference<Exception> exception = new AtomicReference<>();
final AtomicInteger numFetches = new AtomicInteger();
CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
() -> {
try {
this.fetcherManager.runEachOnce();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ void testResetToCheckpointTimeout() throws Exception {
(RecreateOnResetOperatorCoordinator) provider.create(context, closingTimeoutMs);

coordinator.resetToCheckpoint(2L, new byte[0]);
CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
context::isJobFailed,
Duration.ofSeconds(5),
"The job should fail due to resetToCheckpoint() timeout.");
Expand Down Expand Up @@ -260,7 +260,7 @@ void testConsecutiveResetToCheckpoint() throws Exception {
}
coordinator.close();
TestingOperatorCoordinator internalCoordinator = getInternalCoordinator(coordinator);
CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
internalCoordinator::isClosed,
Duration.ofSeconds(5),
"Timed out when waiting for the coordinator to close.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void testCallAsyncExceptionFailsJob() throws Exception {
(ignored, e) -> {
throw new RuntimeException();
});
CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
context::isJobFailed,
Duration.ofSeconds(10L),
"The job did not fail before timeout.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
import static org.apache.flink.core.testutils.CommonTestUtils.waitUntil;
import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyAssignment;
import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyException;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -300,7 +300,7 @@ public void start() {
null)) {

coordinator.start();
waitUtil(
waitUntil(
() -> operatorCoordinatorContext.isJobFailed(),
Duration.ofSeconds(10),
"The job should have failed due to the artificial exception.");
Expand Down Expand Up @@ -356,7 +356,7 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
coordinator.start();
coordinator.handleEventFromOperator(1, 0, new SourceEventWrapper(new SourceEvent() {}));

waitUtil(
waitUntil(
() -> operatorCoordinatorContext.isJobFailed(),
Duration.ofSeconds(10),
"The job should have failed due to the artificial exception.");
Expand Down Expand Up @@ -413,7 +413,7 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
})
.get();

waitUtil(
waitUntil(
splitEnumerator::closed,
Duration.ofSeconds(5),
"Split enumerator was not closed in 5 seconds.");
Expand Down Expand Up @@ -532,7 +532,7 @@ public void testSubtaskRestartAndRequestSplitsAgain() throws Exception {
sourceCoordinator.executionAttemptFailed(0, attemptNumber, null);
sourceCoordinator.subtaskReset(0, 99L);

waitUtilNumberReached(() -> getEnumerator().getUnassignedSplits().size(), 2);
waitUntilNumberReached(() -> getEnumerator().getUnassignedSplits().size(), 2);

attemptNumber++;
setReaderTaskReady(sourceCoordinator, 0, attemptNumber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,12 @@ protected void waitForCoordinatorToProcessActions() {
}

void waitForSentEvents(int expectedEventNumber) throws Exception {
waitUtilNumberReached(() -> receivingTasks.getNumberOfSentEvents(), expectedEventNumber);
waitUntilNumberReached(() -> receivingTasks.getNumberOfSentEvents(), expectedEventNumber);
}

static void waitUtilNumberReached(Supplier<Integer> numberSupplier, int expectedNumber)
static void waitUntilNumberReached(Supplier<Integer> numberSupplier, int expectedNumber)
throws Exception {
CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
() -> numberSupplier.get() == expectedNumber,
Duration.ofDays(1),
"Not reach expected number within timeout.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ public void createAndVerifyCreateMaterializedTableWithData(
awaitOperationTermination(service, sessionHandle, materializedTableHandle);

// verify data exists in materialized table
CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
() ->
fetchTableData(
sessionHandle,
Expand Down Expand Up @@ -318,7 +318,7 @@ public void verifyRefreshJobCreated(
assertThat(jobDetailsInfo.getJobType()).isEqualTo(JobType.BATCH);

// 3. verify the new job is finished
CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
() -> {
try {
return JobStatus.FINISHED.equals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ void testConfigureSqlGateway() throws Exception {
.newThread(() -> SqlGateway.startSqlGateway(stream, args));
thread.start();

CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
() -> MockedSqlGatewayEndpoint.isRunning(id),
Duration.ofSeconds(10),
"Failed to get the endpoint starts.");

thread.interrupt();
CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
() -> !thread.isAlive(),
Duration.ofSeconds(10),
"Failed to get the endpoint starts.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ void testStaticPartitionRefreshMaterializedTableViaRestAPI() throws Exception {
OperationHandle operationHandle =
new OperationHandle(UUID.fromString(response.getOperationHandle()));

CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
() ->
SQL_GATEWAY_SERVICE_EXTENSION
.getService()
Expand Down Expand Up @@ -198,7 +198,7 @@ void testPeriodicRefreshMaterializedTableViaRestAPI() throws Exception {
OperationHandle operationHandle =
new OperationHandle(UUID.fromString(response.getOperationHandle()));

CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
() ->
SQL_GATEWAY_SERVICE_EXTENSION
.getService()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ protected String runSingleStatement(String statement) throws Exception {
.getOperationManager()
.getOperation(operationHandle));

CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
() ->
SQL_GATEWAY_SERVICE_EXTENSION
.getService()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ void testAlterMaterializedTableRefresh() throws Exception {
verifyRefreshJobCreated(restClusterClient, jobId, currentTime);

// 2. verify the new job overwrite the data
CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
() ->
fetchTableData(sessionHandle, "SELECT * FROM my_materialized_table").size()
== data.size(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ void testOperationGetErrorAndFetchError() throws Exception {
});
startRunningLatch.await();

CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
() ->
service.getOperationInfo(sessionHandle, operationHandle)
.getStatus()
Expand Down Expand Up @@ -840,7 +840,7 @@ void testCancelAndCloseOperationInParallel() throws Exception {
executor.submit(() -> service.closeOperation(sessionHandle, operationHandle));
}

CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
() ->
service.getSession(sessionHandle).getOperationManager().getOperationCount()
== 0,
Expand Down Expand Up @@ -965,7 +965,7 @@ void testReleaseLockWhenFailedToSubmitOperation() throws Exception {
success.countDown();
return getDefaultResultSet();
});
CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
() -> success.getCount() == 0, Duration.ofSeconds(10), "Should come to end.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void after() {
protected String runSingleStatement(String statement) throws Exception {
OperationHandle operationHandle =
service.executeStatement(sessionHandle, statement, -1, new Configuration());
CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
() ->
service.getOperationInfo(sessionHandle, operationHandle)
.getStatus()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ void testCancelUninterruptedOperation() throws Exception {
isRunning.compareAndSet(false, true);
}
});
CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
isRunning::get, Duration.ofSeconds(10), "Failed to start up the task.");
assertThatThrownBy(() -> operationManager.cancelOperation(operationHandle))
.satisfies(
Expand Down Expand Up @@ -176,7 +176,7 @@ void testCloseUninterruptedOperation() throws Exception {
});
});
}
CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
isRunning::get, Duration.ofSeconds(10), "Failed to start up the task.");

assertThatThrownBy(() -> operationManager.close())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ void testFetchResultsMultipleTimesWithLimitedFetchSizeInOrientation() {
void testFetchResultInParallel() throws Exception {
ResultFetcher fetcher =
buildResultFetcher(Collections.singletonList(data.iterator()), data.size() / 2);
CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
() -> fetcher.getResultStore().getBufferedRecordSize() > 0,
Duration.ofSeconds(10),
"Failed to wait the buffer has data.");
Expand Down Expand Up @@ -328,7 +328,7 @@ void testFetchResultAfterClose() throws Exception {
meetEnd.set(true);
});

CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
meetEnd::get,
Duration.ofSeconds(10),
"Should get EOS when fetch results from the closed fetcher.");
Expand Down Expand Up @@ -399,7 +399,7 @@ void testFetchIllegalToken() {
void testFetchBeforeWithDifferentSize() throws Exception {
ResultFetcher fetcher =
buildResultFetcher(Collections.singletonList(data.iterator()), data.size() / 2);
CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
() -> fetcher.getResultStore().getBufferedRecordSize() > 1,
Duration.ofSeconds(10),
"Failed to make cached records num larger than 1.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ private void waitUntilAllTaskManagerConnected() throws InterruptedException, Tim
checkNotNull(
restClusterClient,
"REST cluster client should not be null when checking TaskManager status");
CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
() -> {
final ClusterOverviewWithVersion clusterOverview;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public static void assertThrows(
* @throws InterruptedException if the thread is interrupted.
*/
@SuppressWarnings("BusyWait")
public static void waitUtil(
public static void waitUntil(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we should change the same typo in the comment if we are making this change.

Supplier<Boolean> condition, Duration timeout, Duration pause, String errorMsg)
throws TimeoutException, InterruptedException {
long timeoutMs = timeout.toMillis();
Expand Down Expand Up @@ -245,7 +245,7 @@ public static void waitUntilIgnoringExceptions(
}
};

waitUtil(safeCondition, timeout, pause, errorMsg);
waitUntil(safeCondition, timeout, pause, errorMsg);
}

/**
Expand All @@ -258,8 +258,8 @@ public static void waitUntilIgnoringExceptions(
* @throws TimeoutException if the condition is not met before timeout.
* @throws InterruptedException if the thread is interrupted.
*/
public static void waitUtil(Supplier<Boolean> condition, Duration timeout, String errorMsg)
public static void waitUntil(Supplier<Boolean> condition, Duration timeout, String errorMsg)
throws TimeoutException, InterruptedException {
waitUtil(condition, timeout, Duration.ofMillis(1), errorMsg);
waitUntil(condition, timeout, Duration.ofMillis(1), errorMsg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ public Long map(Long value) throws Exception {

private static void waitUntilAtLeastOneTaskHasBeenDeployed(TestProcess taskManagerProcess)
throws InterruptedException, TimeoutException {
CommonTestUtils.waitUtil(
CommonTestUtils.waitUntil(
() ->
taskManagerProcess
.getProcessOutput()
Expand Down