Skip to content

Commit 78b5e48

Browse files
committed
GH-494: [Flight] Allow configuring connect timeout in JDBC
Fixes #494.
1 parent 8d3fe93 commit 78b5e48

File tree

8 files changed

+240
-33
lines changed

8 files changed

+240
-33
lines changed

Diff for: flight/flight-sql-jdbc-core/pom.xml

+15
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,21 @@ under the License.
4747
</exclusions>
4848
</dependency>
4949

50+
<dependency>
51+
<groupId>io.grpc</groupId>
52+
<artifactId>grpc-api</artifactId>
53+
</dependency>
54+
55+
<dependency>
56+
<groupId>io.grpc</groupId>
57+
<artifactId>grpc-netty</artifactId>
58+
</dependency>
59+
60+
<dependency>
61+
<groupId>io.netty</groupId>
62+
<artifactId>netty-transport</artifactId>
63+
</dependency>
64+
5065
<dependency>
5166
<groupId>org.apache.arrow</groupId>
5267
<artifactId>arrow-memory-core</artifactId>

Diff for: flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java

+1
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ private static ArrowFlightSqlClientHandler createNewClientHandler(
113113
.withRetainCookies(config.retainCookies())
114114
.withRetainAuth(config.retainAuth())
115115
.withCatalog(config.getCatalog())
116+
.withConnectTimeout(config.getConnectTimeout())
116117
.build();
117118
} catch (final SQLException e) {
118119
try {

Diff for: flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java

+39-13
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717
package org.apache.arrow.driver.jdbc.client;
1818

1919
import com.google.common.collect.ImmutableMap;
20+
import io.grpc.netty.NettyChannelBuilder;
21+
import io.netty.channel.ChannelOption;
2022
import java.io.IOException;
2123
import java.net.URI;
2224
import java.security.GeneralSecurityException;
2325
import java.sql.SQLException;
26+
import java.time.Duration;
2427
import java.util.ArrayList;
2528
import java.util.Arrays;
2629
import java.util.Collection;
@@ -36,6 +39,7 @@
3639
import org.apache.arrow.flight.FlightClient;
3740
import org.apache.arrow.flight.FlightClientMiddleware;
3841
import org.apache.arrow.flight.FlightEndpoint;
42+
import org.apache.arrow.flight.FlightGrpcUtils;
3943
import org.apache.arrow.flight.FlightInfo;
4044
import org.apache.arrow.flight.FlightRuntimeException;
4145
import org.apache.arrow.flight.FlightStatusCode;
@@ -50,6 +54,7 @@
5054
import org.apache.arrow.flight.auth2.ClientIncomingAuthHeaderMiddleware;
5155
import org.apache.arrow.flight.client.ClientCookieMiddleware;
5256
import org.apache.arrow.flight.grpc.CredentialCallOption;
57+
import org.apache.arrow.flight.grpc.NettyClientBuilder;
5358
import org.apache.arrow.flight.sql.FlightSqlClient;
5459
import org.apache.arrow.flight.sql.impl.FlightSql.SqlInfo;
5560
import org.apache.arrow.flight.sql.util.TableRef;
@@ -138,12 +143,11 @@ public List<CloseableEndpointStreamPair> getStreams(final FlightInfo flightInfo)
138143
// Clone the builder and then set the new endpoint on it.
139144

140145
// GH-38574: Currently a new FlightClient will be made for each partition that returns a
141-
// non-empty Location
142-
// then disposed of. It may be better to cache clients because a server may report the
143-
// same Locations.
144-
// It would also be good to identify when the reported location is the same as the
145-
// original connection's
146-
// Location and skip creating a FlightClient in that scenario.
146+
// non-empty Location then disposed of. It may be better to cache clients because a server
147+
// may report the same Locations. It would also be good to identify when the reported
148+
// location
149+
// is the same as the original connection's Location and skip creating a FlightClient in
150+
// that scenario.
147151
List<Exception> exceptions = new ArrayList<>();
148152
CloseableEndpointStreamPair stream = null;
149153
for (Location location : endpoint.getLocations()) {
@@ -158,7 +162,8 @@ public List<CloseableEndpointStreamPair> getStreams(final FlightInfo flightInfo)
158162
new Builder(ArrowFlightSqlClientHandler.this.builder)
159163
.withHost(endpointUri.getHost())
160164
.withPort(endpointUri.getPort())
161-
.withEncryption(endpointUri.getScheme().equals(LocationSchemes.GRPC_TLS));
165+
.withEncryption(endpointUri.getScheme().equals(LocationSchemes.GRPC_TLS))
166+
.withConnectTimeout(builder.connectTimeout);
162167

163168
ArrowFlightSqlClientHandler endpointHandler = null;
164169
try {
@@ -177,6 +182,7 @@ public List<CloseableEndpointStreamPair> getStreams(final FlightInfo flightInfo)
177182
exceptions.add(ex);
178183
continue;
179184
}
185+
180186
break;
181187
}
182188
if (stream != null) {
@@ -543,6 +549,8 @@ public static final class Builder {
543549

544550
@VisibleForTesting Optional<String> catalog = Optional.empty();
545551

552+
@VisibleForTesting @Nullable Duration connectTimeout;
553+
546554
// These two middleware are for internal use within build() and should not be exposed by builder
547555
// APIs.
548556
// Note that these middleware may not necessarily be registered.
@@ -825,6 +833,19 @@ public Builder withCatalog(@Nullable final String catalog) {
825833
return this;
826834
}
827835

836+
public Builder withConnectTimeout(Duration connectTimeout) {
837+
this.connectTimeout = connectTimeout;
838+
return this;
839+
}
840+
841+
/** Get the location that this client will connect to. */
842+
public Location getLocation() {
843+
if (useEncryption) {
844+
return Location.forGrpcTls(host, port);
845+
}
846+
return Location.forGrpcInsecure(host, port);
847+
}
848+
828849
/**
829850
* Builds a new {@link ArrowFlightSqlClientHandler} from the provided fields.
830851
*
@@ -845,17 +866,15 @@ public ArrowFlightSqlClientHandler build() throws SQLException {
845866
if (isUsingUserPasswordAuth) {
846867
buildTimeMiddlewareFactories.add(authFactory);
847868
}
848-
final FlightClient.Builder clientBuilder = FlightClient.builder().allocator(allocator);
869+
final NettyClientBuilder clientBuilder = new NettyClientBuilder();
870+
clientBuilder.allocator(allocator);
849871

850872
buildTimeMiddlewareFactories.add(new ClientCookieMiddleware.Factory());
851873
buildTimeMiddlewareFactories.forEach(clientBuilder::intercept);
852-
Location location;
853874
if (useEncryption) {
854-
location = Location.forGrpcTls(host, port);
855875
clientBuilder.useTls();
856-
} else {
857-
location = Location.forGrpcInsecure(host, port);
858876
}
877+
Location location = getLocation();
859878
clientBuilder.location(location);
860879

861880
if (useEncryption) {
@@ -883,7 +902,14 @@ public ArrowFlightSqlClientHandler build() throws SQLException {
883902
}
884903
}
885904

886-
client = clientBuilder.build();
905+
NettyChannelBuilder channelBuilder = clientBuilder.build();
906+
if (connectTimeout != null) {
907+
channelBuilder.withOption(
908+
ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) connectTimeout.toMillis());
909+
}
910+
client =
911+
FlightGrpcUtils.createFlightClient(
912+
allocator, channelBuilder.build(), clientBuilder.middleware());
887913
final ArrayList<CallOption> credentialOptions = new ArrayList<>();
888914
if (isUsingUserPasswordAuth) {
889915
// If the authFactory has already been used for a handshake, use the existing token.

Diff for: flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.arrow.driver.jdbc.utils;
1818

19+
import java.time.Duration;
1920
import java.util.Arrays;
2021
import java.util.HashMap;
2122
import java.util.Map;
@@ -163,6 +164,16 @@ public String getCatalog() {
163164
return ArrowFlightConnectionProperty.CATALOG.getString(properties);
164165
}
165166

167+
/** The initial connect timeout. */
168+
public Duration getConnectTimeout() {
169+
Integer timeout = ArrowFlightConnectionProperty.CONNECT_TIMEOUT_MILLIS.getInteger(properties);
170+
if (timeout == null) {
171+
return Duration.ofMillis(
172+
(int) ArrowFlightConnectionProperty.CONNECT_TIMEOUT_MILLIS.defaultValue());
173+
}
174+
return Duration.ofMillis(timeout);
175+
}
176+
166177
/**
167178
* Gets the {@link CallOption}s from this {@link ConnectionConfig}.
168179
*
@@ -213,7 +224,9 @@ public enum ArrowFlightConnectionProperty implements ConnectionProperty {
213224
TOKEN("token", null, Type.STRING, false),
214225
RETAIN_COOKIES("retainCookies", true, Type.BOOLEAN, false),
215226
RETAIN_AUTH("retainAuth", true, Type.BOOLEAN, false),
216-
CATALOG("catalog", null, Type.STRING, false);
227+
CATALOG("catalog", null, Type.STRING, false),
228+
CONNECT_TIMEOUT_MILLIS("connectTimeoutMs", 10000, Type.NUMBER, false),
229+
;
217230

218231
private final String camelName;
219232
private final Object defaultValue;

Diff for: flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ResultSetTest.java

+134-6
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,7 @@
2525
import static org.hamcrest.CoreMatchers.instanceOf;
2626
import static org.hamcrest.CoreMatchers.is;
2727
import static org.hamcrest.MatcherAssert.assertThat;
28-
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
29-
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
30-
import static org.junit.jupiter.api.Assertions.assertEquals;
31-
import static org.junit.jupiter.api.Assertions.assertThrows;
32-
import static org.junit.jupiter.api.Assertions.assertTrue;
33-
import static org.junit.jupiter.api.Assertions.fail;
28+
import static org.junit.jupiter.api.Assertions.*;
3429

3530
import com.google.common.collect.ImmutableSet;
3631
import java.nio.charset.StandardCharsets;
@@ -645,6 +640,139 @@ public void testFallbackSecondFlightServer() throws Exception {
645640
}
646641
}
647642

643+
@Test
644+
public void testFallbackUnresolvableFlightServer() throws Exception {
645+
final Schema schema =
646+
new Schema(
647+
Collections.singletonList(Field.nullable("int_column", Types.MinorType.INT.getType())));
648+
try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
649+
VectorSchemaRoot resultData = VectorSchemaRoot.create(schema, allocator)) {
650+
resultData.setRowCount(1);
651+
((IntVector) resultData.getVector(0)).set(0, 1);
652+
653+
try (final FallbackFlightSqlProducer rootProducer =
654+
new FallbackFlightSqlProducer(resultData);
655+
FlightServer rootServer =
656+
FlightServer.builder(allocator, forGrpcInsecure("localhost", 0), rootProducer)
657+
.build()
658+
.start();
659+
Connection newConnection =
660+
DriverManager.getConnection(
661+
String.format(
662+
"jdbc:arrow-flight-sql://%s:%d/?useEncryption=false",
663+
rootServer.getLocation().getUri().getHost(), rootServer.getPort()))) {
664+
// This first attempt should take a measurable amount of time.
665+
long start = System.nanoTime();
666+
try (Statement newStatement = newConnection.createStatement()) {
667+
try (ResultSet result = newStatement.executeQuery("fallback with unresolvable")) {
668+
List<Integer> actualData = new ArrayList<>();
669+
while (result.next()) {
670+
actualData.add(result.getInt(1));
671+
}
672+
673+
// Assert
674+
assertEquals(resultData.getRowCount(), actualData.size());
675+
assertTrue(actualData.contains(((IntVector) resultData.getVector(0)).get(0)));
676+
}
677+
}
678+
long attempt1 = System.nanoTime();
679+
double elapsedMs = (attempt1 - start) / 1_000_000.;
680+
assertTrue(
681+
elapsedMs >= 5000.,
682+
String.format(
683+
"Expected first attempt to hit the timeout, but only %f ms elapsed", elapsedMs));
684+
685+
// Once the client cache is implemented (GH-661), this second attempt should take less time,
686+
// since the failure from before should be cached.
687+
start = System.nanoTime();
688+
try (Statement newStatement = newConnection.createStatement()) {
689+
try (ResultSet result = newStatement.executeQuery("fallback with unresolvable")) {
690+
List<Integer> actualData = new ArrayList<>();
691+
while (result.next()) {
692+
actualData.add(result.getInt(1));
693+
}
694+
695+
// Assert
696+
assertEquals(resultData.getRowCount(), actualData.size());
697+
assertTrue(actualData.contains(((IntVector) resultData.getVector(0)).get(0)));
698+
}
699+
}
700+
attempt1 = System.nanoTime();
701+
elapsedMs = (attempt1 - start) / 1_000_000.;
702+
// TODO(GH-661): this assertion should be flipped to assertTrue.
703+
assertFalse(
704+
elapsedMs < 5000.,
705+
String.format("Expected second attempt to be the same, but %f ms elapsed", elapsedMs));
706+
}
707+
}
708+
}
709+
710+
@Test
711+
public void testFallbackUnresolvableFlightServerDisableCache() throws Exception {
712+
final Schema schema =
713+
new Schema(
714+
Collections.singletonList(Field.nullable("int_column", Types.MinorType.INT.getType())));
715+
try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
716+
VectorSchemaRoot resultData = VectorSchemaRoot.create(schema, allocator)) {
717+
resultData.setRowCount(1);
718+
((IntVector) resultData.getVector(0)).set(0, 1);
719+
720+
try (final FallbackFlightSqlProducer rootProducer =
721+
new FallbackFlightSqlProducer(resultData);
722+
FlightServer rootServer =
723+
FlightServer.builder(allocator, forGrpcInsecure("localhost", 0), rootProducer)
724+
.build()
725+
.start();
726+
Connection newConnection =
727+
DriverManager.getConnection(
728+
String.format(
729+
"jdbc:arrow-flight-sql://%s:%d/?useEncryption=false&useClientCache=false",
730+
rootServer.getLocation().getUri().getHost(), rootServer.getPort()))) {
731+
// This first attempt should take a measurable amount of time.
732+
long start = System.nanoTime();
733+
try (Statement newStatement = newConnection.createStatement()) {
734+
try (ResultSet result = newStatement.executeQuery("fallback with unresolvable")) {
735+
List<Integer> actualData = new ArrayList<>();
736+
while (result.next()) {
737+
actualData.add(result.getInt(1));
738+
}
739+
740+
// Assert
741+
assertEquals(resultData.getRowCount(), actualData.size());
742+
assertTrue(actualData.contains(((IntVector) resultData.getVector(0)).get(0)));
743+
}
744+
}
745+
long attempt1 = System.nanoTime();
746+
double elapsedMs = (attempt1 - start) / 1_000_000.;
747+
assertTrue(
748+
elapsedMs >= 5000.,
749+
String.format(
750+
"Expected first attempt to hit the timeout, but only %f ms elapsed", elapsedMs));
751+
752+
// This second attempt should take a long time still, since we disabled the cache.
753+
start = System.nanoTime();
754+
try (Statement newStatement = newConnection.createStatement()) {
755+
try (ResultSet result = newStatement.executeQuery("fallback with unresolvable")) {
756+
List<Integer> actualData = new ArrayList<>();
757+
while (result.next()) {
758+
actualData.add(result.getInt(1));
759+
}
760+
761+
// Assert
762+
assertEquals(resultData.getRowCount(), actualData.size());
763+
assertTrue(actualData.contains(((IntVector) resultData.getVector(0)).get(0)));
764+
}
765+
}
766+
attempt1 = System.nanoTime();
767+
elapsedMs = (attempt1 - start) / 1_000_000.;
768+
assertTrue(
769+
elapsedMs >= 5000.,
770+
String.format(
771+
"Expected second attempt to hit the timeout, but only %f ms elapsed", elapsedMs));
772+
}
773+
}
774+
}
775+
648776
@Test
649777
public void testShouldRunSelectQueryWithEmptyVectorsEmbedded() throws Exception {
650778
try (Statement statement = connection.createStatement();

Diff for: flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandlerBuilderTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ public void testDefaults() {
147147
assertNull(builder.clientCertificatePath);
148148
assertNull(builder.clientKeyPath);
149149
assertEquals(Optional.empty(), builder.catalog);
150+
assertNull(builder.connectTimeout);
150151
}
151152

152153
@Test

0 commit comments

Comments
 (0)