Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
Expand All @@ -37,6 +39,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -54,21 +57,27 @@ public class DataDefinitionExecution<T extends Statement>
private final QueryStateMachine stateMachine;
private final List<Expression> parameters;
private final WarningCollector warningCollector;
private final ListeningExecutorService executor;

// Track the running future so it can be cancelled
private volatile ListenableFuture<Void> runningFuture;

private DataDefinitionExecution(
DataDefinitionTask<T> task,
T statement,
Slug slug,
QueryStateMachine stateMachine,
List<Expression> parameters,
WarningCollector warningCollector)
WarningCollector warningCollector,
ExecutorService executor)
{
this.task = requireNonNull(task, "task is null");
this.statement = requireNonNull(statement, "statement is null");
this.slug = requireNonNull(slug, "slug is null");
this.stateMachine = requireNonNull(stateMachine, "stateMachine is null");
this.parameters = parameters;
this.warningCollector = requireNonNull(warningCollector, "warningCollector is null");
this.executor = MoreExecutors.listeningDecorator(requireNonNull(executor, "executor is null"));
stateMachine.addStateChangeListener(state -> {
if (state.isDone() && stateMachine.getFinalQueryInfo().isEmpty()) {
// make sure the final query info is set and listeners are triggered
Expand Down Expand Up @@ -149,8 +158,12 @@ public void start()
return;
}

ListenableFuture<Void> future = task.execute(statement, stateMachine, parameters, warningCollector);
Futures.addCallback(future, new FutureCallback<>()
// Execute the DDL task asynchronously to make it cancellable
runningFuture = executor.submit(() -> {
ListenableFuture<Void> taskFuture = task.execute(statement, stateMachine, parameters, warningCollector);
return taskFuture.get();
});
Futures.addCallback(runningFuture, new FutureCallback<>()
{
@Override
public void onSuccess(@Nullable Void result)
Expand Down Expand Up @@ -222,6 +235,11 @@ public boolean isDone()
@Override
public void cancelQuery()
{
// Cancel the running future if it exists
ListenableFuture<Void> future = runningFuture;
if (future != null) {
future.cancel(true);
}
stateMachine.transitionToCanceled();
}

Expand Down Expand Up @@ -306,11 +324,15 @@ public static class DataDefinitionExecutionFactory
implements QueryExecutionFactory<DataDefinitionExecution<?>>
{
private final Map<Class<? extends Statement>, DataDefinitionTask<?>> tasks;
private final ExecutorService executor;

@Inject
public DataDefinitionExecutionFactory(Map<Class<? extends Statement>, DataDefinitionTask<?>> tasks)
public DataDefinitionExecutionFactory(
Map<Class<? extends Statement>, DataDefinitionTask<?>> tasks,
@ForQueryExecution ExecutorService executor)
{
this.tasks = requireNonNull(tasks, "tasks is null");
this.executor = requireNonNull(executor, "executor is null");
}

@Override
Expand All @@ -336,7 +358,7 @@ private <T extends Statement> DataDefinitionExecution<T> createDataDefinitionExe
checkArgument(task != null, "no task for statement: %s", statement.getClass().getSimpleName());

stateMachine.setUpdateType(task.getName());
return new DataDefinitionExecution<>(task, statement, slug, stateMachine, parameters, warningCollector);
return new DataDefinitionExecution<>(task, statement, slug, stateMachine, parameters, warningCollector, executor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.trino.execution.warnings.WarningCollector;
import io.trino.metadata.QualifiedObjectName;
import io.trino.security.AllowAllAccessControl;
import io.trino.server.protocol.Slug;
import io.trino.sql.tree.DropTable;
import io.trino.sql.tree.NodeLocation;
import io.trino.sql.tree.QualifiedName;
import org.junit.jupiter.api.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND;
Expand Down Expand Up @@ -127,8 +133,44 @@ public void testDropTableIfExistsOnMaterializedView()
.hasMessageContaining("Table '%s' does not exist, but a materialized view with that name exists. Did you mean DROP MATERIALIZED VIEW %s?", viewName, viewName);
}

@Test
public void testDropTableIsCancellable()
throws Exception
{
QualifiedObjectName tableName = qualifiedObjectName("existing_table");
metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL);
assertThat(metadata.getTableHandle(testSession, tableName)).isPresent();

// Test that drop table operation can be cancelled by using a slow executor
ListeningExecutorService slowExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
try {
DropTable dropTable = new DropTable(new NodeLocation(1, 1), asQualifiedName(tableName), false);
DropTableTask task = new DropTableTask(metadata, new AllowAllAccessControl());

// Simulate cancellation by creating a future that can be cancelled
ListenableFuture<Void> future = slowExecutor.submit(() -> {
try {
Thread.sleep(1000); // Simulate slow operation
return task.execute(dropTable, queryStateMachine, ImmutableList.of(), WarningCollector.NOOP).get();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Cancelled", e);
}
});

// Cancel the future
boolean cancelled = future.cancel(true);
assertThat(cancelled).isTrue();
}
finally {
slowExecutor.shutdown();
}
}

private ListenableFuture<Void> executeDropTable(QualifiedName tableName, boolean exists)
{
return new DropTableTask(metadata, new AllowAllAccessControl()).execute(new DropTable(new NodeLocation(1, 1), tableName, exists), queryStateMachine, ImmutableList.of(), WarningCollector.NOOP);
return new DropTableTask(metadata, new AllowAllAccessControl())
.execute(new DropTable(new NodeLocation(1, 1), tableName, exists), queryStateMachine, ImmutableList.of(), WarningCollector.NOOP);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -701,13 +701,30 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
throw new TrinoException(HIVE_METASTORE_ERROR, e);
}
try {
// Check for interruption before starting the potentially long-running operation
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Drop table operation was cancelled");
}
dropTableData(table.io(), table.operations().current());
}
catch (InterruptedException e) {
// Restore interrupted status and exit gracefully
Thread.currentThread().interrupt();
throw new RuntimeException("Drop table operation was cancelled", e);
}
catch (RuntimeException e) {
// If the snapshot file is not found, an exception will be thrown by the dropTableData function.
// So log the exception and continue with deleting the table location
LOG.warn(e, "Failed to delete table data referenced by metadata");
}

// Check for interruption before final cleanup
if (Thread.currentThread().isInterrupted()) {
LOG.info("Drop table operation was cancelled during cleanup for table: %s", schemaTableName);
Thread.currentThread().interrupt();
throw new RuntimeException("Drop table operation was cancelled during cleanup");
}

deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.location());
invalidateTableCache(schemaTableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,15 +426,32 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
schemaTableName.getTableName(),
false /* do not delete data */);
try {
// Check for interruption before starting the potentially long-running operation
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Drop table operation was cancelled");
}
// Use the Iceberg routine for dropping the table data because the data files
// of the Iceberg table may be located in different locations
dropTableData(table.io(), metadata);
}
catch (InterruptedException e) {
// Restore interrupted status and exit gracefully
Thread.currentThread().interrupt();
throw new RuntimeException("Drop table operation was cancelled", e);
}
catch (RuntimeException e) {
// If the snapshot file is not found, an exception will be thrown by the dropTableData function.
// So log the exception and continue with deleting the table location
log.warn(e, "Failed to delete table data referenced by metadata");
}

// Check for interruption before final cleanup
if (Thread.currentThread().isInterrupted()) {
log.info("Drop table operation was cancelled during cleanup for table: %s", schemaTableName);
Thread.currentThread().interrupt();
throw new RuntimeException("Drop table operation was cancelled during cleanup");
}

deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, metastoreTable.getStorage().getLocation());
invalidateTableCache(schemaTableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,13 +341,30 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)

jdbcCatalog.dropTable(toIdentifier(schemaTableName), false);
try {
// Check for interruption before starting the potentially long-running operation
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Drop table operation was cancelled");
}
dropTableData(table.io(), table.operations().current());
}
catch (InterruptedException e) {
// Restore interrupted status and exit gracefully
Thread.currentThread().interrupt();
throw new RuntimeException("Drop table operation was cancelled", e);
}
catch (RuntimeException e) {
// If the snapshot file is not found, an exception will be thrown by the dropTableData function.
// So log the exception and continue with deleting the table location
LOG.warn(e, "Failed to delete table data referenced by metadata");
}

// Check for interruption before final cleanup
if (Thread.currentThread().isInterrupted()) {
LOG.info("Drop table operation was cancelled during cleanup for table: %s", schemaTableName);
Thread.currentThread().interrupt();
throw new RuntimeException("Drop table operation was cancelled during cleanup");
}

deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.location());
invalidateTableCache(schemaTableName);
}
Expand Down