From de677ed0ef0a91dfe6b64c7246253c6f40e67e70 Mon Sep 17 00:00:00 2001 From: ogerson Date: Thu, 7 Nov 2019 16:57:22 +0200 Subject: [PATCH] Added query timeout support Added queryTimeout_not_set setting, having the query timeout not explicitly set by the user thus avoiding overriding a timeout that might be set by the connection --- .../java/org/davidmoten/rx/jdbc/Select.java | 18 +++++++-------- .../org/davidmoten/rx/jdbc/SelectBuilder.java | 9 +++++++- .../TransactedSelectAutomappedBuilder.java | 5 ++-- .../rx/jdbc/TransactedSelectBuilder.java | 3 ++- .../rx/jdbc/TransactedUpdateBuilder.java | 3 ++- .../java/org/davidmoten/rx/jdbc/Update.java | 12 +++++----- .../org/davidmoten/rx/jdbc/UpdateBuilder.java | 9 +++++++- .../java/org/davidmoten/rx/jdbc/Util.java | 23 +++++++++++-------- 8 files changed, 52 insertions(+), 30 deletions(-) diff --git a/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/Select.java b/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/Select.java index 97f986a2..2ea6af63 100644 --- a/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/Select.java +++ b/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/Select.java @@ -25,27 +25,27 @@ private Select() { private static final Logger log = LoggerFactory.getLogger(Select.class); static Flowable create(Single connections, - Flowable> parameterGroups, String sql, int fetchSize, - Function mapper, boolean eagerDispose) { + Flowable> parameterGroups, String sql, int fetchSize, + Function mapper, boolean eagerDispose, int queryTimeoutSec) { return connections // .toFlowable() // - .flatMap(con -> create(con, sql, parameterGroups, fetchSize, mapper, eagerDispose)); + .flatMap(con -> create(con, sql, parameterGroups, fetchSize, mapper, eagerDispose, queryTimeoutSec)); } static Flowable create(Connection con, String sql, - Flowable> parameterGroups, int fetchSize, - Function mapper, boolean eagerDispose) { + Flowable> parameterGroups, int fetchSize, + Function mapper, boolean eagerDispose, int queryTimeoutSec) { log.debug("Select.create called with con={}", con); - Callable initialState = () -> Util.prepare(con, fetchSize, sql); + Callable initialState = () -> Util.prepare(con, fetchSize, sql, queryTimeoutSec); Function> observableFactory = ps -> parameterGroups - .flatMap(parameters -> create(ps.ps, parameters, mapper, ps.names, sql, fetchSize), + .flatMap(parameters -> create(ps.ps, parameters, mapper, ps.names, sql, fetchSize, queryTimeoutSec), true, 1); Consumer disposer = Util::closePreparedStatementAndConnection; return Flowable.using(initialState, observableFactory, disposer, eagerDispose); } private static Flowable create(PreparedStatement ps, List parameters, - Function mapper, List names, String sql, int fetchSize) { + Function mapper, List names, String sql, int fetchSize, int queryTimeoutSec) { log.debug("parameters={}", parameters); log.debug("names={}", names); @@ -56,7 +56,7 @@ private static Flowable create(PreparedStatement ps, List private final Database db; int fetchSize = 0; // default + int queryTimeoutSec = Util.QUERY_TIMEOUT_NOT_SET; //default private Flowable dependsOn; SelectBuilder(String sql, Single connection, Database db) { @@ -43,6 +44,12 @@ public SelectBuilder fetchSize(int size) { return this; } + public SelectBuilder queryTimeoutSec(int timeoutSec) { + Preconditions.checkArgument(timeoutSec >= 0); + this.queryTimeoutSec = timeoutSec; + return this; + } + public TransactedSelectBuilder transacted() { return new TransactedSelectBuilder(this, db); } @@ -55,7 +62,7 @@ public TransactedSelectBuilder transactedValuesOnly() { public Flowable get(@Nonnull ResultSetMapper mapper) { Preconditions.checkNotNull(mapper, "mapper cannot be null"); Flowable> pg = super.parameterGroupsToFlowable(); - Flowable f = Select.create(connection, pg, sql, fetchSize, mapper, true); + Flowable f = Select.create(connection, pg, sql, fetchSize, mapper, true, queryTimeoutSec); if (dependsOn != null) { return dependsOn.ignoreElements().andThen(f); } else { diff --git a/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/TransactedSelectAutomappedBuilder.java b/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/TransactedSelectAutomappedBuilder.java index 66d95fb7..2319ab61 100644 --- a/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/TransactedSelectAutomappedBuilder.java +++ b/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/TransactedSelectAutomappedBuilder.java @@ -96,7 +96,8 @@ private static Flowable> createFlowable(SelectAutomappedBuilder sb, sb.selectBuilder.sql, // sb.selectBuilder.fetchSize, // Util.autoMap(sb.cls), // - false) // + false, // + sb.selectBuilder.queryTimeoutSec) // .materialize() // .flatMap(n -> Tx.toTx(n, connection.get(), db)) // .doOnNext(tx -> { @@ -107,4 +108,4 @@ private static Flowable> createFlowable(SelectAutomappedBuilder sb, }); } -} \ No newline at end of file +} diff --git a/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/TransactedSelectBuilder.java b/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/TransactedSelectBuilder.java index 39fa3c48..63b58a3f 100644 --- a/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/TransactedSelectBuilder.java +++ b/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/TransactedSelectBuilder.java @@ -105,7 +105,8 @@ private static Flowable> createFlowable(SelectBuilder sb, sb.sql, // sb.fetchSize, // mapper, // - false) // + false, // + sb.queryTimeoutSec) // .materialize() // .flatMap(n -> Tx.toTx(n, connection.get(), db)) // .doOnNext(tx -> { diff --git a/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/TransactedUpdateBuilder.java b/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/TransactedUpdateBuilder.java index 2a5e79d6..5db864c9 100644 --- a/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/TransactedUpdateBuilder.java +++ b/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/TransactedUpdateBuilder.java @@ -137,7 +137,8 @@ private static Flowable> createFlowable(UpdateBuilder ub, Database d ub.parameterGroupsToFlowable(), // ub.sql, // ub.batchSize, // - false) // + false, // + ub.queryTimeoutSec) // .flatMap(n -> Tx.toTx(n, connection.get(), db)) // .doOnNext(tx -> { t[0] = ((TxImpl) tx); diff --git a/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/Update.java b/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/Update.java index 67ea03ea..bff9a350 100644 --- a/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/Update.java +++ b/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/Update.java @@ -28,18 +28,18 @@ private Update() { } static Flowable> create(Single connection, - Flowable> parameterGroups, String sql, int batchSize, - boolean eagerDispose) { + Flowable> parameterGroups, String sql, int batchSize, + boolean eagerDispose, int queryTimeoutSec) { return connection // .toFlowable() // - .flatMap(con -> create(con, sql, parameterGroups, batchSize, eagerDispose), true, + .flatMap(con -> create(con, sql, parameterGroups, batchSize, eagerDispose, queryTimeoutSec), true, 1); } private static Flowable> create(Connection con, String sql, - Flowable> parameterGroups, int batchSize, boolean eagerDispose) { + Flowable> parameterGroups, int batchSize, boolean eagerDispose, int queryTimeoutSec) { log.debug("Update.create {}", sql); - Callable resourceFactory = () -> Util.prepare(con, sql); + Callable resourceFactory = () -> Util.prepare(con, sql, queryTimeoutSec); final Function>> flowableFactory; if (batchSize == 0) { flowableFactory = ps -> parameterGroups // @@ -101,7 +101,7 @@ private static Single create(NamedPreparedStatement ps, List if (hasCollection) { // create a new prepared statement with the collection ? substituted with // ?s to match the size of the collection parameter - ps2 = Util.prepare(ps.ps.getConnection(), 0, sql, params); + ps2 = Util.prepare(ps.ps.getConnection(), 0, sql, params, ps.ps.getQueryTimeout()); } else { ps2 = ps.ps; } diff --git a/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/UpdateBuilder.java b/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/UpdateBuilder.java index 5570147d..ffdaf81d 100644 --- a/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/UpdateBuilder.java +++ b/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/UpdateBuilder.java @@ -20,6 +20,7 @@ public final class UpdateBuilder extends ParametersBuilder implem private final Database db; Flowable dependsOn; int batchSize = DEFAULT_BATCH_SIZE; + int queryTimeoutSec = Util.QUERY_TIMEOUT_NOT_SET; UpdateBuilder(String sql, Single connections, Database db) { super(sql); @@ -40,6 +41,12 @@ public UpdateBuilder batchSize(int batchSize) { return this; } + public UpdateBuilder queryTimeoutSec(int queryTimeoutSec) { + Preconditions.checkArgument(queryTimeoutSec >= 0); + this.queryTimeoutSec = queryTimeoutSec; + return this; + } + /** * Returns a builder used to specify how to process the generated keys * {@link ResultSet}. Not all jdbc drivers support this functionality and @@ -56,7 +63,7 @@ public ReturnGeneratedKeysBuilder returnGeneratedKeys() { public Flowable counts() { return startWithDependency( - Update.create(connections, super.parameterGroupsToFlowable(), sql, batchSize, true).dematerialize()); + Update.create(connections, super.parameterGroupsToFlowable(), sql, batchSize, true, queryTimeoutSec).dematerialize()); } Flowable startWithDependency(@Nonnull Flowable f) { diff --git a/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/Util.java b/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/Util.java index e86b347e..91d9ef47 100644 --- a/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/Util.java +++ b/rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/Util.java @@ -64,6 +64,7 @@ public enum Util { ; private static final Logger log = LoggerFactory.getLogger(Util.class); + public static final int QUERY_TIMEOUT_NOT_SET = -1; /** * Sets parameters for the {@link PreparedStatement}. @@ -287,32 +288,32 @@ static void closeCallableStatementAndConnection(NamedCallableStatement stmt) { closePreparedStatementAndConnection(stmt.stmt); } - static NamedPreparedStatement prepare(Connection con, String sql) throws SQLException { - return prepare(con, 0, sql); + static NamedPreparedStatement prepare(Connection con, String sql, int queryTimeoutSec) throws SQLException { + return prepare(con, 0, sql, queryTimeoutSec); } - static NamedPreparedStatement prepare(Connection con, int fetchSize, String sql) throws SQLException { + static NamedPreparedStatement prepare(Connection con, int fetchSize, String sql, int queryTimeoutSec) throws SQLException { // TODO can we parse SqlInfo through because already calculated by // builder? SqlInfo info = SqlInfo.parse(sql); log.debug("preparing statement: {}", sql); - return prepare(con, fetchSize, info); + return prepare(con, fetchSize, info, queryTimeoutSec); } - static PreparedStatement prepare(Connection connection, int fetchSize, String sql, List parameters) + static PreparedStatement prepare(Connection connection, int fetchSize, String sql, List parameters, int queryTimeoutSec) throws SQLException { // should only get here when parameters contains a collection SqlInfo info = SqlInfo.parse(sql, parameters); log.debug("preparing statement: {}", info.sql()); - return createPreparedStatement(connection, fetchSize, info); + return createPreparedStatement(connection, fetchSize, info, queryTimeoutSec); } - private static NamedPreparedStatement prepare(Connection con, int fetchSize, SqlInfo info) throws SQLException { - PreparedStatement ps = createPreparedStatement(con, fetchSize, info); + private static NamedPreparedStatement prepare(Connection con, int fetchSize, SqlInfo info, int queryTimeoutSec) throws SQLException { + PreparedStatement ps = createPreparedStatement(con, fetchSize, info, queryTimeoutSec); return new NamedPreparedStatement(ps, info.names()); } - private static PreparedStatement createPreparedStatement(Connection con, int fetchSize, SqlInfo info) + private static PreparedStatement createPreparedStatement(Connection con, int fetchSize, SqlInfo info, int queryTimeoutSec) throws SQLException { PreparedStatement ps = null; try { @@ -320,6 +321,10 @@ private static PreparedStatement createPreparedStatement(Connection con, int fet if (fetchSize > 0) { ps.setFetchSize(fetchSize); } + + if (queryTimeoutSec != QUERY_TIMEOUT_NOT_SET) { + ps.setQueryTimeout(queryTimeoutSec); + } } catch (RuntimeException | SQLException e) { if (ps != null) { ps.close();