Skip to content

Commit bfbff88

Browse files
committed
Migrate JDBC to page source API
1 parent 40f61e7 commit bfbff88

File tree

11 files changed

+215
-422
lines changed

11 files changed

+215
-422
lines changed

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DynamicFilteringJdbcSplitSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@
2727
* Attaches dynamic filter to {@link JdbcSplit} after {@link JdbcDynamicFilteringSplitManager}
2828
* has waited for the collection of dynamic filters.
2929
* This allows JDBC based connectors to avoid waiting for dynamic filters again on the worker node
30-
* in {@link JdbcRecordSetProvider}. The number of splits generated by JDBC based connectors are
30+
* in {@link JdbcPageSourceProvider}. The number of splits generated by JDBC based connectors are
3131
* typically small, therefore attaching dynamic filter here does not add significant overhead.
3232
* Waiting for dynamic filters in {@link JdbcDynamicFilteringSplitManager} is preferred over waiting
33-
* for them on the worker node in {@link JdbcRecordSetProvider} to allow connectors to take advantage of
33+
* for them on the worker node in {@link JdbcPageSourceProvider} to allow connectors to take advantage of
3434
* dynamic filters during the splits generation phase.
3535
*/
3636
public class DynamicFilteringJdbcSplitSource

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,10 @@
1515

1616
import com.google.common.util.concurrent.MoreExecutors;
1717
import com.google.inject.Binder;
18-
import com.google.inject.Inject;
1918
import com.google.inject.Key;
2019
import com.google.inject.Provider;
2120
import com.google.inject.Scopes;
22-
import com.google.inject.Singleton;
2321
import com.google.inject.multibindings.Multibinder;
24-
import com.google.inject.multibindings.ProvidesIntoOptional;
25-
import dev.failsafe.RetryPolicy;
2622
import io.airlift.configuration.AbstractConfigurationAwareModule;
2723
import io.trino.plugin.base.mapping.IdentifierMappingModule;
2824
import io.trino.plugin.base.session.SessionPropertiesProvider;
@@ -33,7 +29,6 @@
3329
import io.trino.spi.connector.ConnectorAccessControl;
3430
import io.trino.spi.connector.ConnectorPageSinkProvider;
3531
import io.trino.spi.connector.ConnectorPageSourceProvider;
36-
import io.trino.spi.connector.ConnectorRecordSetProvider;
3732
import io.trino.spi.connector.ConnectorSplitManager;
3833
import io.trino.spi.function.table.ConnectorTableFunction;
3934
import io.trino.spi.procedure.Procedure;
@@ -42,7 +37,6 @@
4237

4338
import static com.google.inject.multibindings.Multibinder.newSetBinder;
4439
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
45-
import static com.google.inject.multibindings.ProvidesIntoOptional.Type.DEFAULT;
4640
import static io.airlift.configuration.ConditionalModule.conditionalModule;
4741
import static io.airlift.configuration.ConfigBinder.configBinder;
4842
import static io.trino.plugin.base.ClosingBinder.closingBinder;
@@ -149,12 +143,4 @@ public static void bindTablePropertiesProvider(Binder binder, Class<? extends Ta
149143
{
150144
tablePropertiesProviderBinder(binder).addBinding().to(type).in(Scopes.SINGLETON);
151145
}
152-
153-
@ProvidesIntoOptional(DEFAULT)
154-
@Inject
155-
@Singleton
156-
ConnectorRecordSetProvider recordSetProvider(JdbcClient jdbcClient, @ForRecordCursor ExecutorService executor, RetryPolicy<Object> policy)
157-
{
158-
return new JdbcRecordSetProvider(jdbcClient, executor, policy);
159-
}
160146
}

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordCursor.java renamed to plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcPageSource.java

Lines changed: 85 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,15 @@
1414
package io.trino.plugin.jdbc;
1515

1616
import com.google.common.base.VerifyException;
17+
import com.google.common.collect.ImmutableList;
1718
import io.airlift.log.Logger;
1819
import io.airlift.slice.Slice;
20+
import io.trino.spi.Page;
21+
import io.trino.spi.PageBuilder;
1922
import io.trino.spi.TrinoException;
23+
import io.trino.spi.block.BlockBuilder;
24+
import io.trino.spi.connector.ConnectorPageSource;
2025
import io.trino.spi.connector.ConnectorSession;
21-
import io.trino.spi.connector.RecordCursor;
2226
import io.trino.spi.type.Type;
2327
import jakarta.annotation.Nullable;
2428

@@ -28,25 +32,25 @@
2832
import java.sql.SQLException;
2933
import java.sql.Statement;
3034
import java.util.List;
31-
import java.util.concurrent.ExecutionException;
35+
import java.util.OptionalLong;
36+
import java.util.concurrent.CompletableFuture;
3237
import java.util.concurrent.ExecutorService;
33-
import java.util.concurrent.Future;
3438
import java.util.concurrent.atomic.AtomicLong;
3539

36-
import static com.google.common.base.Preconditions.checkArgument;
37-
import static com.google.common.base.Preconditions.checkState;
3840
import static com.google.common.base.Verify.verify;
41+
import static com.google.common.collect.ImmutableList.toImmutableList;
42+
import static io.airlift.concurrent.MoreFutures.getFutureValue;
3943
import static io.trino.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
44+
import static java.lang.System.nanoTime;
4045
import static java.util.Objects.requireNonNull;
46+
import static java.util.concurrent.CompletableFuture.supplyAsync;
4147

42-
public class JdbcRecordCursor
43-
implements RecordCursor
48+
public class JdbcPageSource
49+
implements ConnectorPageSource
4450
{
45-
private static final Logger log = Logger.get(JdbcRecordCursor.class);
51+
private static final Logger log = Logger.get(JdbcPageSource.class);
4652

47-
private final ExecutorService executor;
48-
49-
private final JdbcColumnHandle[] columnHandles;
53+
private final List<JdbcColumnHandle> columnHandles;
5054
private final ReadFunction[] readFunctions;
5155
private final BooleanReadFunction[] booleanReadFunctions;
5256
private final DoubleReadFunction[] doubleReadFunctions;
@@ -58,16 +62,18 @@ public class JdbcRecordCursor
5862
private final Connection connection;
5963
private final PreparedStatement statement;
6064
private final AtomicLong readTimeNanos = new AtomicLong(0);
65+
private final PageBuilder pageBuilder;
66+
private final CompletableFuture<ResultSet> resultSetFuture;
6167
@Nullable
6268
private ResultSet resultSet;
69+
private boolean finished;
6370
private boolean closed;
71+
private long completedPositions;
6472

65-
public JdbcRecordCursor(JdbcClient jdbcClient, ExecutorService executor, ConnectorSession session, JdbcSplit split, BaseJdbcConnectorTableHandle table, List<JdbcColumnHandle> columnHandles)
73+
public JdbcPageSource(JdbcClient jdbcClient, ExecutorService executor, ConnectorSession session, JdbcSplit split, BaseJdbcConnectorTableHandle table, List<JdbcColumnHandle> columnHandles)
6674
{
6775
this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null");
68-
this.executor = requireNonNull(executor, "executor is null");
69-
70-
this.columnHandles = columnHandles.toArray(new JdbcColumnHandle[0]);
76+
this.columnHandles = ImmutableList.copyOf(columnHandles);
7177

7278
readFunctions = new ReadFunction[columnHandles.size()];
7379
booleanReadFunctions = new BooleanReadFunction[columnHandles.size()];
@@ -84,7 +90,7 @@ public JdbcRecordCursor(JdbcClient jdbcClient, ExecutorService executor, Connect
8490
connection = jdbcClient.getConnection(session, split, (JdbcTableHandle) table);
8591
}
8692

87-
for (int i = 0; i < this.columnHandles.length; i++) {
93+
for (int i = 0; i < this.columnHandles.size(); i++) {
8894
JdbcColumnHandle columnHandle = columnHandles.get(i);
8995
ColumnMapping columnMapping = jdbcClient.toColumnMapping(session, connection, columnHandle.getJdbcTypeHandle())
9096
.orElseThrow(() -> new VerifyException("Column %s has unsupported type %s".formatted(columnHandle.getColumnName(), columnHandle.getJdbcTypeHandle())));
@@ -119,6 +125,22 @@ else if (javaType == Slice.class) {
119125
else {
120126
statement = jdbcClient.buildSql(session, connection, split, (JdbcTableHandle) table, columnHandles);
121127
}
128+
pageBuilder = new PageBuilder(columnHandles.stream()
129+
.map(JdbcColumnHandle::getColumnType)
130+
.collect(toImmutableList()));
131+
resultSetFuture = supplyAsync(() -> {
132+
long start = nanoTime();
133+
try {
134+
log.debug("Executing: %s", statement);
135+
return statement.executeQuery();
136+
}
137+
catch (SQLException e) {
138+
throw handleSqlException(e);
139+
}
140+
finally {
141+
readTimeNanos.addAndGet(nanoTime() - start);
142+
}
143+
}, executor);
122144
}
123145
catch (SQLException | RuntimeException e) {
124146
throw handleSqlException(e);
@@ -132,140 +154,82 @@ public long getReadTimeNanos()
132154
}
133155

134156
@Override
135-
public long getCompletedBytes()
157+
public boolean isFinished()
136158
{
137-
return 0;
159+
return finished;
138160
}
139161

140162
@Override
141-
public Type getType(int field)
163+
public Page getNextPage()
142164
{
143-
return columnHandles[field].getColumnType();
144-
}
145-
146-
@Override
147-
public boolean advanceNextPosition()
148-
{
149-
if (closed) {
150-
return false;
151-
}
152-
165+
verify(pageBuilder.isEmpty(), "Expected pageBuilder to be empty");
153166
try {
154167
if (resultSet == null) {
155-
long start = System.nanoTime();
156-
Future<ResultSet> resultSetFuture = executor.submit(() -> {
157-
log.debug("Executing: %s", statement);
158-
return statement.executeQuery();
159-
});
160-
try {
161-
// statement.executeQuery() may block uninterruptedly, using async way so we are able to cancel remote query
162-
// See javadoc of java.sql.Connection.setNetworkTimeout
163-
resultSet = resultSetFuture.get();
164-
}
165-
catch (ExecutionException e) {
166-
if (e.getCause() instanceof SQLException cause) {
167-
SQLException sqlException = new SQLException(cause.getMessage(), cause.getSQLState(), cause.getErrorCode(), e);
168-
if (cause.getNextException() != null) {
169-
sqlException.setNextException(cause.getNextException());
170-
}
171-
throw sqlException;
168+
resultSet = requireNonNull(getFutureValue(resultSetFuture), "resultSet is null");
169+
}
170+
171+
while (!pageBuilder.isFull() && resultSet.next()) {
172+
pageBuilder.declarePosition();
173+
completedPositions++;
174+
for (int i = 0; i < columnHandles.size(); i++) {
175+
BlockBuilder output = pageBuilder.getBlockBuilder(i);
176+
Type type = columnHandles.get(i).getColumnType();
177+
if (readFunctions[i].isNull(resultSet, i + 1)) {
178+
output.appendNull();
179+
}
180+
else if (booleanReadFunctions[i] != null) {
181+
type.writeBoolean(output, booleanReadFunctions[i].readBoolean(resultSet, i + 1));
182+
}
183+
else if (doubleReadFunctions[i] != null) {
184+
type.writeDouble(output, doubleReadFunctions[i].readDouble(resultSet, i + 1));
185+
}
186+
else if (longReadFunctions[i] != null) {
187+
type.writeLong(output, longReadFunctions[i].readLong(resultSet, i + 1));
188+
}
189+
else if (sliceReadFunctions[i] != null) {
190+
type.writeSlice(output, sliceReadFunctions[i].readSlice(resultSet, i + 1));
191+
}
192+
else {
193+
type.writeObject(output, objectReadFunctions[i].readObject(resultSet, i + 1));
172194
}
173-
throw new RuntimeException(e);
174-
}
175-
catch (InterruptedException e) {
176-
Thread.currentThread().interrupt();
177-
resultSetFuture.cancel(true);
178-
throw new RuntimeException(e);
179-
}
180-
finally {
181-
readTimeNanos.addAndGet(System.nanoTime() - start);
182195
}
183196
}
184-
return resultSet.next();
185-
}
186-
catch (SQLException | RuntimeException e) {
187-
throw handleSqlException(e);
188-
}
189-
}
190197

191-
@Override
192-
public boolean getBoolean(int field)
193-
{
194-
checkState(!closed, "cursor is closed");
195-
requireNonNull(resultSet, "resultSet is null");
196-
try {
197-
return booleanReadFunctions[field].readBoolean(resultSet, field + 1);
198+
if (!pageBuilder.isFull()) {
199+
finished = true;
200+
}
198201
}
199202
catch (SQLException | RuntimeException e) {
200203
throw handleSqlException(e);
201204
}
202-
}
203205

204-
@Override
205-
public long getLong(int field)
206-
{
207-
checkState(!closed, "cursor is closed");
208-
requireNonNull(resultSet, "resultSet is null");
209-
try {
210-
return longReadFunctions[field].readLong(resultSet, field + 1);
211-
}
212-
catch (SQLException | RuntimeException e) {
213-
throw handleSqlException(e);
214-
}
206+
Page page = pageBuilder.build();
207+
pageBuilder.reset();
208+
return page;
215209
}
216210

217211
@Override
218-
public double getDouble(int field)
212+
public long getMemoryUsage()
219213
{
220-
checkState(!closed, "cursor is closed");
221-
requireNonNull(resultSet, "resultSet is null");
222-
try {
223-
return doubleReadFunctions[field].readDouble(resultSet, field + 1);
224-
}
225-
catch (SQLException | RuntimeException e) {
226-
throw handleSqlException(e);
227-
}
214+
return pageBuilder.getRetainedSizeInBytes();
228215
}
229216

230217
@Override
231-
public Slice getSlice(int field)
218+
public long getCompletedBytes()
232219
{
233-
checkState(!closed, "cursor is closed");
234-
requireNonNull(resultSet, "resultSet is null");
235-
try {
236-
return sliceReadFunctions[field].readSlice(resultSet, field + 1);
237-
}
238-
catch (SQLException | RuntimeException e) {
239-
throw handleSqlException(e);
240-
}
220+
return 0;
241221
}
242222

243223
@Override
244-
public Object getObject(int field)
224+
public OptionalLong getCompletedPositions()
245225
{
246-
checkState(!closed, "cursor is closed");
247-
requireNonNull(resultSet, "resultSet is null");
248-
try {
249-
return objectReadFunctions[field].readObject(resultSet, field + 1);
250-
}
251-
catch (SQLException | RuntimeException e) {
252-
throw handleSqlException(e);
253-
}
226+
return OptionalLong.of(completedPositions);
254227
}
255228

256229
@Override
257-
public boolean isNull(int field)
230+
public CompletableFuture<?> isBlocked()
258231
{
259-
checkState(!closed, "cursor is closed");
260-
checkArgument(field < columnHandles.length, "Invalid field index");
261-
requireNonNull(resultSet, "resultSet is null");
262-
263-
try {
264-
return readFunctions[field].isNull(resultSet, field + 1);
265-
}
266-
catch (SQLException | RuntimeException e) {
267-
throw handleSqlException(e);
268-
}
232+
return resultSetFuture;
269233
}
270234

271235
@Override
@@ -275,6 +239,7 @@ public void close()
275239
return;
276240
}
277241
closed = true;
242+
finished = true;
278243

279244
// use try with resources to close everything properly
280245
try (Connection connection = this.connection;
@@ -291,11 +256,13 @@ public void close()
291256
}
292257
if (connection != null && resultSet != null) {
293258
jdbcClient.abortReadConnection(connection, resultSet);
259+
resultSetFuture.cancel(true);
294260
}
295261
}
296262
catch (SQLException | RuntimeException e) {
297263
// ignore exception from close
298264
}
265+
resultSet = null;
299266
}
300267

301268
private RuntimeException handleSqlException(Exception e)

0 commit comments

Comments
 (0)