Skip to content

Commit 182744d

Browse files
committed
Migrate JDBC plugins to page source API
This enables dynamic row filtering and columnar filter evaluation optimizations on JDBC based sources. We're also able to use io.trino.plugin.jdbc.JdbcPageSource#isBlocked to yield when waiting for results from source DB. RemoteQueryCancellation is removed because all jdbc queries are now executed on a separate thread pool which allows remote query cancellation.
1 parent f87802f commit 182744d

18 files changed

+232
-578
lines changed

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.google.common.collect.ImmutableSet;
1717
import io.airlift.configuration.Config;
1818
import io.airlift.configuration.ConfigDescription;
19+
import io.airlift.configuration.DefunctConfig;
1920
import io.airlift.units.Duration;
2021
import io.airlift.units.MinDuration;
2122
import jakarta.validation.constraints.AssertTrue;
@@ -29,6 +30,7 @@
2930
import static jakarta.validation.constraints.Pattern.Flag.CASE_INSENSITIVE;
3031
import static java.util.concurrent.TimeUnit.SECONDS;
3132

33+
@DefunctConfig("remote-query-async-cancellation.enabled")
3234
public class BaseJdbcConfig
3335
{
3436
private static final String METADATA_CACHE_TTL = "metadata.cache-ttl";

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DynamicFilteringJdbcSplitSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@
2727
* Attaches dynamic filter to {@link JdbcSplit} after {@link JdbcDynamicFilteringSplitManager}
2828
* has waited for the collection of dynamic filters.
2929
* This allows JDBC based connectors to avoid waiting for dynamic filters again on the worker node
30-
* in {@link JdbcRecordSetProvider}. The number of splits generated by JDBC based connectors are
30+
* in {@link JdbcPageSourceProvider}. The number of splits generated by JDBC based connectors are
3131
* typically small, therefore attaching dynamic filter here does not add significant overhead.
3232
* Waiting for dynamic filters in {@link JdbcDynamicFilteringSplitManager} is preferred over waiting
33-
* for them on the worker node in {@link JdbcRecordSetProvider} to allow connectors to take advantage of
33+
* for them on the worker node in {@link JdbcPageSourceProvider} to allow connectors to take advantage of
3434
* dynamic filters during the splits generation phase.
3535
*/
3636
public class DynamicFilteringJdbcSplitSource

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ForRecordCursor.java renamed to plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ForJdbcClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,6 @@
2626
@Retention(RUNTIME)
2727
@Target({FIELD, PARAMETER, METHOD})
2828
@BindingAnnotation
29-
public @interface ForRecordCursor
29+
public @interface ForJdbcClient
3030
{
3131
}

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,13 @@
1313
*/
1414
package io.trino.plugin.jdbc;
1515

16-
import com.google.common.util.concurrent.MoreExecutors;
1716
import com.google.inject.Binder;
18-
import com.google.inject.Inject;
1917
import com.google.inject.Key;
2018
import com.google.inject.Provider;
19+
import com.google.inject.Provides;
2120
import com.google.inject.Scopes;
2221
import com.google.inject.Singleton;
2322
import com.google.inject.multibindings.Multibinder;
24-
import com.google.inject.multibindings.ProvidesIntoOptional;
25-
import dev.failsafe.RetryPolicy;
2623
import io.airlift.configuration.AbstractConfigurationAwareModule;
2724
import io.trino.plugin.base.mapping.IdentifierMappingModule;
2825
import io.trino.plugin.base.session.SessionPropertiesProvider;
@@ -33,7 +30,6 @@
3330
import io.trino.spi.connector.ConnectorAccessControl;
3431
import io.trino.spi.connector.ConnectorPageSinkProvider;
3532
import io.trino.spi.connector.ConnectorPageSourceProvider;
36-
import io.trino.spi.connector.ConnectorRecordSetProvider;
3733
import io.trino.spi.connector.ConnectorSplitManager;
3834
import io.trino.spi.function.table.ConnectorTableFunction;
3935
import io.trino.spi.procedure.Procedure;
@@ -42,10 +38,12 @@
4238

4339
import static com.google.inject.multibindings.Multibinder.newSetBinder;
4440
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
45-
import static com.google.inject.multibindings.ProvidesIntoOptional.Type.DEFAULT;
41+
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
4642
import static io.airlift.configuration.ConditionalModule.conditionalModule;
4743
import static io.airlift.configuration.ConfigBinder.configBinder;
4844
import static io.trino.plugin.base.ClosingBinder.closingBinder;
45+
import static java.lang.String.format;
46+
import static java.util.concurrent.Executors.newCachedThreadPool;
4947
import static org.weakref.jmx.guice.ExportBinder.newExporter;
5048

5149
public class JdbcModule
@@ -109,15 +107,10 @@ public void setup(Binder binder)
109107

110108
newOptionalBinder(binder, Key.get(int.class, MaxDomainCompactionThreshold.class));
111109

112-
newOptionalBinder(binder, Key.get(ExecutorService.class, ForRecordCursor.class))
113-
.setDefault()
114-
.toProvider(MoreExecutors::newDirectExecutorService)
115-
.in(Scopes.SINGLETON);
116-
117110
newSetBinder(binder, JdbcQueryEventListener.class);
118111

119112
closingBinder(binder)
120-
.registerExecutor(Key.get(ExecutorService.class, ForRecordCursor.class));
113+
.registerExecutor(Key.get(ExecutorService.class, ForJdbcClient.class));
121114
}
122115

123116
public static Multibinder<SessionPropertiesProvider> sessionPropertiesProviderBinder(Binder binder)
@@ -150,11 +143,11 @@ public static void bindTablePropertiesProvider(Binder binder, Class<? extends Ta
150143
tablePropertiesProviderBinder(binder).addBinding().to(type).in(Scopes.SINGLETON);
151144
}
152145

153-
@ProvidesIntoOptional(DEFAULT)
154-
@Inject
146+
@Provides
155147
@Singleton
156-
ConnectorRecordSetProvider recordSetProvider(JdbcClient jdbcClient, @ForRecordCursor ExecutorService executor, RetryPolicy<Object> policy)
148+
@ForJdbcClient
149+
public ExecutorService provideJdbcClientExecutor(CatalogName catalogName)
157150
{
158-
return new JdbcRecordSetProvider(jdbcClient, executor, policy);
151+
return newCachedThreadPool(daemonThreadsNamed(format("%s-jdbc-client-%%d", catalogName)));
159152
}
160153
}

0 commit comments

Comments
 (0)