From 86cf3555dc201e68f31f1b4522ad8a79cd738904 Mon Sep 17 00:00:00 2001 From: Taraka Rama Rao Lethavadla Date: Fri, 17 Oct 2025 17:02:54 +0530 Subject: [PATCH 1/5] HIVE-29254-Display TxnId associated with the query in show processlist command --- .../hadoop/hive/ql/session/ProcessListInfo.java | 12 ++++++++++-- .../cli/operation/ShowProcessListOperation.java | 6 +++++- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/ProcessListInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/session/ProcessListInfo.java index 19e31157f8b5..f52c3a298f89 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/ProcessListInfo.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/ProcessListInfo.java @@ -32,10 +32,11 @@ public class ProcessListInfo { private final String runtime; // tracks only running portion of the query. private final long elapsedTime; private final String state; + private final long txnId; private ProcessListInfo(String userName, String ipAddr, String sessionId, long sessionActiveTime, long sessionIdleTime, String queryId, String executionEngine, String beginTime, - String runtime, long elapsedTime, String state) { + String runtime, long elapsedTime, String state, long txnId) { this.userName = userName; this.ipAddr = ipAddr; this.sessionId = sessionId; @@ -47,6 +48,7 @@ private ProcessListInfo(String userName, String ipAddr, String sessionId, long s this.runtime = runtime; this.elapsedTime = elapsedTime; this.state = state; + this.txnId = txnId; } public String getSessionId() { @@ -105,6 +107,7 @@ public static class Builder { private String runtime; private long elapsedTime; private String state; + private long txnId; public Builder setSessionId(String sessionId) { this.sessionId = sessionId; @@ -161,10 +164,15 @@ public Builder setState(String state) { return this; } + public Builder setTxnId(long txnId) { + this.txnId = txnId; + return this; + } + public ProcessListInfo build() { ProcessListInfo processListInfo = new ProcessListInfo(userName, ipAddr, sessionId, sessionActiveTime, sessionIdleTime, queryId, executionEngine, beginTime, runtime, - elapsedTime, state); + elapsedTime, state, txnId); return processListInfo; } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/ShowProcessListOperation.java b/service/src/java/org/apache/hive/service/cli/operation/ShowProcessListOperation.java index 12143b970854..0d58ed6fa5ae 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/ShowProcessListOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/ShowProcessListOperation.java @@ -71,7 +71,10 @@ private List getLiveQueryInfos(HiveSession parentSession) { LocalDateTime beginTime = LocalDateTime.ofInstant( Instant.ofEpochMilli(query.getBeginTime()), ZoneId.systemDefault() ); - + long txnId = 0; + if(op.queryState != null && op.queryState.getTxnManager() != null ){ + txnId = op.queryState.getTxnManager().getCurrentTxnId(); + } return new ProcessListInfo.Builder() .setUserName(session.getUserName()) .setIpAddr(session.getIpAddress()) @@ -84,6 +87,7 @@ private List getLiveQueryInfos(HiveSession parentSession) { .setRuntime(query.getRuntime() == null ? "Not finished" : String.valueOf(query.getRuntime() / 1000)) .setElapsedTime(query.getElapsedTime() / 1000) .setState(query.getState()) + .setTxnId(txnId) .build(); }) .collect(Collectors.toList()); From 811b2b85f2c37c4b3e41aab4b66ce97b6f84b391 Mon Sep 17 00:00:00 2001 From: Taraka Rama Rao Lethavadla Date: Wed, 22 Oct 2025 01:09:36 +0530 Subject: [PATCH 2/5] HIVE-29254-Display TxnId associated with the query in show processlist command --- .../ql/processor/TestShowProcessList.java | 127 ++++++++++++++++++ .../processors/ShowProcessListProcessor.java | 2 + .../hive/ql/session/ProcessListInfo.java | 2 + .../operation/ShowProcessListOperation.java | 6 +- 4 files changed, 134 insertions(+), 3 deletions(-) create mode 100644 itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/processor/TestShowProcessList.java diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/processor/TestShowProcessList.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/processor/TestShowProcessList.java new file mode 100644 index 000000000000..c01962ad689a --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/processor/TestShowProcessList.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.processor; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.net.URL; +import java.sql.*; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class TestShowProcessList { + protected static final Logger LOG = LoggerFactory.getLogger(TestShowProcessList.class); + + private static MiniHS2 miniHS2 = null; + private static HiveConf conf; + private static String user; + private static ThreadPoolExecutor executor; + + static HiveConf defaultConf() throws Exception { + String confDir = "../../data/conf/llap/"; + HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); + HiveConf defaultConf = new HiveConf(); + defaultConf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml")); + return defaultConf; + } + + @BeforeClass + public static void beforeTest() throws Exception { + conf = defaultConf(); + user = System.getProperty("user.name"); + conf.setVar(HiveConf.ConfVars.USERS_IN_ADMIN_ROLE, user); + conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + conf.set(MetastoreConf.ConfVars.WAREHOUSE.name(), new File(System.getProperty( + "java.io.tmpdir") + File.separator + TestShowProcessList.class.getCanonicalName() + "-" + System.currentTimeMillis()).getPath() + .replaceAll("\\\\", "/") + "/warehouse"); + TestTxnDbUtil.setConfValues(conf); + TestTxnDbUtil.prepDb(conf); + MiniHS2.cleanupLocalDir(); + Class.forName(MiniHS2.getJdbcDriverName()); + miniHS2 = new MiniHS2(conf, MiniHS2.MiniClusterType.LLAP); + Map confOverlay = new HashMap<>(); + miniHS2.start(confOverlay); + + executor = new ThreadPoolExecutor(5, 10, 50, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); + } + + @AfterClass + public static void afterTest() throws Exception { + if (miniHS2 != null && miniHS2.isStarted()) { + miniHS2.stop(); + } + TestTxnDbUtil.cleanDb(conf); + } + + @Test + public void testQueries() throws Exception { + //Initiate several parallel connections, each with a query that may begin a transaction. + for (int i = 0; i < 20; i++) { + executor.submit(() -> { + try (Connection con = DriverManager.getConnection(miniHS2.getJdbcURL(), user, "bar"); + Statement stmt = con.createStatement()) { + stmt.execute("drop database if exists DB_" + Thread.currentThread().threadId() + " cascade"); + } catch (Exception ignored) { + } + }); + } + try (Connection testCon = DriverManager.getConnection(miniHS2.getJdbcURL(), user, "bar"); + Statement s = testCon.createStatement()) { + while (executor.getActiveCount() > 0) { + long txnId = executeShowProcessList(s); + System.out.println("txnId is " + txnId); + if (txnId > -1) { // -1 implies that there are no queries running at that moment + Assert.assertTrue(txnId >= 1); + break; + } + } + } + } + + private static long executeShowProcessList(Statement s) { + try (ResultSet rs = s.executeQuery("show processlist")) { + while (rs.next()) { + long txnId = Long.parseLong(rs.getString("Txn ID")); + // TxnId can be 0 because the query has not yet opened txn when show processlist is run. + if (txnId > 0) { + return txnId; + } + } + } catch (Exception e) { + LOG.warn("Exception when checking hive state", e); + } + return -1; + } + +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/processors/ShowProcessListProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/processors/ShowProcessListProcessor.java index 90182aaaab63..985cd74e6ea1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/processors/ShowProcessListProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/ShowProcessListProcessor.java @@ -59,6 +59,7 @@ private Schema getSchema() { sch.addToFieldSchemas(new FieldSchema("Session Idle Time (s)", STRING_TYPE_NAME, "")); sch.addToFieldSchemas(new FieldSchema("Query ID", STRING_TYPE_NAME, "")); sch.addToFieldSchemas(new FieldSchema("State", STRING_TYPE_NAME, "")); + sch.addToFieldSchemas(new FieldSchema("Txn ID", STRING_TYPE_NAME, "")); sch.addToFieldSchemas(new FieldSchema("Opened Timestamp (s)", STRING_TYPE_NAME, "")); sch.addToFieldSchemas(new FieldSchema("Elapsed Time (s)", STRING_TYPE_NAME, "")); sch.addToFieldSchemas(new FieldSchema("Runtime (s)", STRING_TYPE_NAME, "")); @@ -91,6 +92,7 @@ public CommandProcessorResponse run(String command) throws CommandProcessorExcep query.getSessionIdleTime(), query.getQueryId(), query.getState(), + query.getTxnId(), query.getBeginTime(), query.getElapsedTime(), query.getRuntime() diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/ProcessListInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/session/ProcessListInfo.java index f52c3a298f89..74e03ecea4ec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/ProcessListInfo.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/ProcessListInfo.java @@ -95,6 +95,8 @@ public String getState() { return state; } + public Long getTxnId() { return txnId; } + public static class Builder { private String userName; private String ipAddr; diff --git a/service/src/java/org/apache/hive/service/cli/operation/ShowProcessListOperation.java b/service/src/java/org/apache/hive/service/cli/operation/ShowProcessListOperation.java index 0d58ed6fa5ae..77108e3b8b4a 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/ShowProcessListOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/ShowProcessListOperation.java @@ -72,9 +72,9 @@ private List getLiveQueryInfos(HiveSession parentSession) { Instant.ofEpochMilli(query.getBeginTime()), ZoneId.systemDefault() ); long txnId = 0; - if(op.queryState != null && op.queryState.getTxnManager() != null ){ - txnId = op.queryState.getTxnManager().getCurrentTxnId(); - } + if (op.queryState != null && op.queryState.getTxnManager() != null) { + txnId = op.queryState.getTxnManager().getCurrentTxnId(); + } return new ProcessListInfo.Builder() .setUserName(session.getUserName()) .setIpAddr(session.getIpAddress()) From 848e652757553f3e2a87046e75de0a6a2d3df4bc Mon Sep 17 00:00:00 2001 From: Taraka Rama Rao Lethavadla Date: Wed, 22 Oct 2025 10:25:09 +0530 Subject: [PATCH 3/5] HIVE-29254-Display TxnId associated with the query in show processlist command --- .../ql/processor/TestShowProcessList.java | 28 +++++++++++++------ .../operation/ShowProcessListOperation.java | 6 ++-- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/processor/TestShowProcessList.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/processor/TestShowProcessList.java index c01962ad689a..a5fd4bb0bcb3 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/processor/TestShowProcessList.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/processor/TestShowProcessList.java @@ -30,8 +30,13 @@ import org.slf4j.LoggerFactory; import java.io.File; -import java.net.URL; -import java.sql.*; +import java.net.URI; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; @@ -48,10 +53,10 @@ public class TestShowProcessList { static HiveConf defaultConf() throws Exception { String confDir = "../../data/conf/llap/"; - HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); + HiveConf.setHiveSiteLocation(new URI("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml").toURL()); System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); HiveConf defaultConf = new HiveConf(); - defaultConf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml")); + defaultConf.addResource(new URI("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml").toURL()); return defaultConf; } @@ -62,9 +67,10 @@ public static void beforeTest() throws Exception { conf.setVar(HiveConf.ConfVars.USERS_IN_ADMIN_ROLE, user); conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); - conf.set(MetastoreConf.ConfVars.WAREHOUSE.name(), new File(System.getProperty( + String dir = new File(System.getProperty( "java.io.tmpdir") + File.separator + TestShowProcessList.class.getCanonicalName() + "-" + System.currentTimeMillis()).getPath() - .replaceAll("\\\\", "/") + "/warehouse"); + .replaceAll("\\\\", "/") + "/warehouse"; + conf.set(MetastoreConf.ConfVars.WAREHOUSE.name(), dir); TestTxnDbUtil.setConfValues(conf); TestTxnDbUtil.prepDb(conf); MiniHS2.cleanupLocalDir(); @@ -92,7 +98,9 @@ public void testQueries() throws Exception { try (Connection con = DriverManager.getConnection(miniHS2.getJdbcURL(), user, "bar"); Statement stmt = con.createStatement()) { stmt.execute("drop database if exists DB_" + Thread.currentThread().threadId() + " cascade"); - } catch (Exception ignored) { + } catch (Exception exception) { + LOG.error(exception.getMessage()); + LOG.error(Arrays.toString(exception.getStackTrace())); } }); } @@ -118,8 +126,10 @@ private static long executeShowProcessList(Statement s) { return txnId; } } - } catch (Exception e) { - LOG.warn("Exception when checking hive state", e); + } catch (Exception exception) { + LOG.error("Exception when checking hive state", exception); + LOG.error(Arrays.toString(exception.getStackTrace())); + } return -1; } diff --git a/service/src/java/org/apache/hive/service/cli/operation/ShowProcessListOperation.java b/service/src/java/org/apache/hive/service/cli/operation/ShowProcessListOperation.java index 77108e3b8b4a..482ca0e8131e 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/ShowProcessListOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/ShowProcessListOperation.java @@ -72,9 +72,9 @@ private List getLiveQueryInfos(HiveSession parentSession) { Instant.ofEpochMilli(query.getBeginTime()), ZoneId.systemDefault() ); long txnId = 0; - if (op.queryState != null && op.queryState.getTxnManager() != null) { - txnId = op.queryState.getTxnManager().getCurrentTxnId(); - } + if (op.queryState != null && op.queryState.getTxnManager() != null) { + txnId = op.queryState.getTxnManager().getCurrentTxnId(); + } return new ProcessListInfo.Builder() .setUserName(session.getUserName()) .setIpAddr(session.getIpAddress()) From c8d0c051012472e8dd43ee59ab84dd7abcab8dbc Mon Sep 17 00:00:00 2001 From: Taraka Rama Rao Lethavadla Date: Wed, 22 Oct 2025 13:07:18 +0530 Subject: [PATCH 4/5] HIVE-29254-Display TxnId associated with the query in show processlist command --- .../hadoop/hive/ql/processor/TestShowProcessList.java | 9 +++++---- .../apache/hadoop/hive/ql/session/ProcessListInfo.java | 4 +++- .../service/cli/operation/ShowProcessListOperation.java | 2 +- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/processor/TestShowProcessList.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/processor/TestShowProcessList.java index a5fd4bb0bcb3..f8eed0ba6a5d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/processor/TestShowProcessList.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/processor/TestShowProcessList.java @@ -67,8 +67,8 @@ public static void beforeTest() throws Exception { conf.setVar(HiveConf.ConfVars.USERS_IN_ADMIN_ROLE, user); conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); - String dir = new File(System.getProperty( - "java.io.tmpdir") + File.separator + TestShowProcessList.class.getCanonicalName() + "-" + System.currentTimeMillis()).getPath() + String suffix = TestShowProcessList.class.getCanonicalName() + "-" + System.currentTimeMillis(); + String dir = new File(System.getProperty("java.io.tmpdir") + File.separator + suffix).getPath() .replaceAll("\\\\", "/") + "/warehouse"; conf.set(MetastoreConf.ConfVars.WAREHOUSE.name(), dir); TestTxnDbUtil.setConfValues(conf); @@ -109,7 +109,8 @@ public void testQueries() throws Exception { while (executor.getActiveCount() > 0) { long txnId = executeShowProcessList(s); System.out.println("txnId is " + txnId); - if (txnId > -1) { // -1 implies that there are no queries running at that moment + // -1 implies that there are no queries running at that moment or txn is not yet opened + if (txnId > -1) { Assert.assertTrue(txnId >= 1); break; } @@ -134,4 +135,4 @@ private static long executeShowProcessList(Statement s) { return -1; } -} \ No newline at end of file +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/ProcessListInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/session/ProcessListInfo.java index 74e03ecea4ec..04b59141b6d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/ProcessListInfo.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/ProcessListInfo.java @@ -95,7 +95,9 @@ public String getState() { return state; } - public Long getTxnId() { return txnId; } + public Long getTxnId() { + return txnId; + } public static class Builder { private String userName; diff --git a/service/src/java/org/apache/hive/service/cli/operation/ShowProcessListOperation.java b/service/src/java/org/apache/hive/service/cli/operation/ShowProcessListOperation.java index 482ca0e8131e..e667f3f6b919 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/ShowProcessListOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/ShowProcessListOperation.java @@ -73,7 +73,7 @@ private List getLiveQueryInfos(HiveSession parentSession) { ); long txnId = 0; if (op.queryState != null && op.queryState.getTxnManager() != null) { - txnId = op.queryState.getTxnManager().getCurrentTxnId(); + txnId = op.queryState.getTxnManager().getCurrentTxnId(); } return new ProcessListInfo.Builder() .setUserName(session.getUserName()) From 6ff17c263dda0f8c00dcdf1697a2e6e8bf421d41 Mon Sep 17 00:00:00 2001 From: Taraka Rama Rao Lethavadla Date: Tue, 28 Oct 2025 12:47:48 +0530 Subject: [PATCH 5/5] HIVE-29254-Display TxnId associated with the query in show processlist command --- .../ql/processor/TestShowProcessList.java | 32 +++++++++++-------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/processor/TestShowProcessList.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/processor/TestShowProcessList.java index f8eed0ba6a5d..7c57db9b2498 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/processor/TestShowProcessList.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/processor/TestShowProcessList.java @@ -38,7 +38,11 @@ import java.sql.Statement; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; +import java.util.Collections; + import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -92,8 +96,9 @@ public static void afterTest() throws Exception { @Test public void testQueries() throws Exception { - //Initiate several parallel connections, each with a query that may begin a transaction. - for (int i = 0; i < 20; i++) { + int connections = 10; + //Initiate 10 parallel connections, each with a query that begins a transaction. + for (int i = 0; i < connections; i++) { executor.submit(() -> { try (Connection con = DriverManager.getConnection(miniHS2.getJdbcURL(), user, "bar"); Statement stmt = con.createStatement()) { @@ -104,27 +109,28 @@ public void testQueries() throws Exception { } }); } + Set txnIds = new HashSet<>(); try (Connection testCon = DriverManager.getConnection(miniHS2.getJdbcURL(), user, "bar"); Statement s = testCon.createStatement()) { while (executor.getActiveCount() > 0) { - long txnId = executeShowProcessList(s); - System.out.println("txnId is " + txnId); - // -1 implies that there are no queries running at that moment or txn is not yet opened - if (txnId > -1) { - Assert.assertTrue(txnId >= 1); - break; - } + // retrieve txnIds from show processlist output + txnIds.addAll(getTxnIdsFromShowProcesslist(s)); } } + System.out.println(txnIds); + // max txnId should be equal to the number of connections + int maxTxnId = Collections.max(txnIds); + Assert.assertEquals(maxTxnId, connections); } - private static long executeShowProcessList(Statement s) { + private static Set getTxnIdsFromShowProcesslist(Statement s) { + Set txnIds = new HashSet<>(); try (ResultSet rs = s.executeQuery("show processlist")) { while (rs.next()) { - long txnId = Long.parseLong(rs.getString("Txn ID")); + int txnId = Integer.parseInt(rs.getString("Txn ID")); // TxnId can be 0 because the query has not yet opened txn when show processlist is run. if (txnId > 0) { - return txnId; + txnIds.add(txnId); } } } catch (Exception exception) { @@ -132,7 +138,7 @@ private static long executeShowProcessList(Statement s) { LOG.error(Arrays.toString(exception.getStackTrace())); } - return -1; + return txnIds; } }