diff --git a/pom.xml b/pom.xml index dd34f060b3f..1ba5ddf30cb 100644 --- a/pom.xml +++ b/pom.xml @@ -265,6 +265,18 @@ simpleclient_pushgateway 0.10.0 + + org.powermock + powermock-module-junit4 + 2.0.2 + test + + + org.powermock + powermock-api-mockito2 + 2.0.2 + test + diff --git a/src/main/java/org/tikv/common/exception/KeyException.java b/src/main/java/org/tikv/common/exception/KeyException.java index 22ddda982b9..048ac3b35fe 100644 --- a/src/main/java/org/tikv/common/exception/KeyException.java +++ b/src/main/java/org/tikv/common/exception/KeyException.java @@ -31,7 +31,7 @@ public KeyException(String errMsg) { } public KeyException(Kvrpcpb.KeyError keyErr) { - super("Key exception occurred"); + super("Key exception occurred " + keyErr.toString()); this.keyErr = keyErr; } diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index 22607b2bdb1..3228650c0ff 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -519,7 +519,7 @@ private boolean isPrewriteSuccess(BackOffer backOffer, PrewriteResponse resp, lo Lock lock = new Lock(err.getLocked(), codec); locks.add(lock); } else { - throw new KeyException(err.toString()); + throw new KeyException(err); } } if (isSuccess) { diff --git a/src/main/java/org/tikv/txn/TxnKVClient.java b/src/main/java/org/tikv/txn/TxnKVClient.java index 7806c56496e..9c5f4c9da3a 100644 --- a/src/main/java/org/tikv/txn/TxnKVClient.java +++ b/src/main/java/org/tikv/txn/TxnKVClient.java @@ -27,7 +27,6 @@ import org.tikv.common.ReadOnlyPDClient; import org.tikv.common.TiConfiguration; import org.tikv.common.exception.GrpcException; -import org.tikv.common.exception.KeyException; import org.tikv.common.exception.RegionException; import org.tikv.common.exception.TiClientInternalException; import org.tikv.common.exception.TiKVException; @@ -181,7 +180,6 @@ public ClientRPCResult commit( // TODO: check this logic to see are we satisfied? private boolean retryableException(Exception e) { return e instanceof TiClientInternalException - || e instanceof KeyException || e instanceof RegionException || e instanceof StatusRuntimeException; } diff --git a/src/test/java/org/tikv/txn/TwoPhaseCommitterTest.java b/src/test/java/org/tikv/txn/TwoPhaseCommitterTest.java index 02a530ffbc6..ecb150b5673 100644 --- a/src/test/java/org/tikv/txn/TwoPhaseCommitterTest.java +++ b/src/test/java/org/tikv/txn/TwoPhaseCommitterTest.java @@ -20,17 +20,40 @@ import static org.junit.Assert.fail; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.RandomStringUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.tikv.BaseTxnKVTest; +import org.tikv.common.BytePairWrapper; import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; +import org.tikv.common.exception.KeyException; +import org.tikv.common.util.BackOffer; +import org.tikv.common.util.ConcreteBackOffer; +@RunWith(PowerMockRunner.class) +@PrepareForTest({TiSession.class, TxnKVClient.class, TwoPhaseCommitter.class}) +@PowerMockIgnore({"javax.net.ssl.*"}) public class TwoPhaseCommitterTest extends BaseTxnKVTest { + + private static final Logger logger = LoggerFactory.getLogger(TwoPhaseCommitterTest.class); private static final int WRITE_BUFFER_SIZE = 32 * 1024; private static final int TXN_COMMIT_BATCH_SIZE = 768 * 1024; private static final long DEFAULT_BATCH_WRITE_LOCK_TTL = 3600000; @@ -76,4 +99,170 @@ public void autoClosableTest() throws Exception { executorService)) {} Assert.assertTrue(executorService.isShutdown()); } + + @Test + public void prewriteWriteConflictFastFailTest() throws Exception { + + int WRITE_BUFFER_SIZE = 32; + String primaryKey = RandomStringUtils.randomAlphabetic(3); + AtomicLong failCount = new AtomicLong(); + ExecutorService executorService = + Executors.newFixedThreadPool( + WRITE_BUFFER_SIZE, + new ThreadFactoryBuilder().setNameFormat("2pc-pool-%d").setDaemon(true).build()); + CountDownLatch latch = new CountDownLatch(2); + int DEFAULT_BATCH_WRITE_LOCK_TTL = 10000; + new Thread( + () -> { + long startTS = session.getTimestamp().getVersion(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + try { + TwoPhaseCommitter twoPhaseCommitter = + new TwoPhaseCommitter( + session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService); + List pairs = + Arrays.asList( + new BytePairWrapper( + primaryKey.getBytes(StandardCharsets.UTF_8), + primaryKey.getBytes(StandardCharsets.UTF_8))); + twoPhaseCommitter.prewriteSecondaryKeys( + primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000); + BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000); + + long commitTs = session.getTimestamp().getVersion(); + + twoPhaseCommitter.commitPrimaryKey( + backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs); + } catch (Exception e) { + logger.error(e.getMessage(), e); + failCount.incrementAndGet(); + } finally { + latch.countDown(); + } + }) + .start(); + + Thread.sleep(10); + new Thread( + () -> { + long startTS = session.getTimestamp().getVersion(); + try { + TwoPhaseCommitter twoPhaseCommitter = + new TwoPhaseCommitter( + session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService); + List pairs = + Arrays.asList( + new BytePairWrapper( + primaryKey.getBytes(StandardCharsets.UTF_8), + primaryKey.getBytes(StandardCharsets.UTF_8))); + twoPhaseCommitter.prewriteSecondaryKeys( + primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000); + BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000); + + long commitTs = session.getTimestamp().getVersion(); + + twoPhaseCommitter.commitPrimaryKey( + backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs); + } catch (Exception e) { + logger.error(e.getMessage(), e); + failCount.incrementAndGet(); + } finally { + latch.countDown(); + } + }) + .start(); + latch.await(); + Assert.assertEquals(1, failCount.get()); + } + + @Test + public void prewriteWriteConflictLongNoFailTest() throws Exception { + + int WRITE_BUFFER_SIZE = 32; + String primaryKey = RandomStringUtils.randomAlphabetic(3); + AtomicLong failCount = new AtomicLong(); + ExecutorService executorService = + Executors.newFixedThreadPool( + WRITE_BUFFER_SIZE, + new ThreadFactoryBuilder().setNameFormat("2pc-pool-%d").setDaemon(true).build()); + CountDownLatch latch = new CountDownLatch(2); + int DEFAULT_BATCH_WRITE_LOCK_TTL = 10000; + + new Thread( + () -> { + long startTS = session.getTimestamp().getVersion(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + try { + session = PowerMockito.spy(session); + TxnKVClient kvClient = PowerMockito.spy(session.createTxnClient()); + PowerMockito.when(kvClient, "retryableException", Mockito.any(KeyException.class)) + .thenReturn(true); + PowerMockito.doReturn(kvClient).when(session).createTxnClient(); + + TwoPhaseCommitter twoPhaseCommitter = + new TwoPhaseCommitter( + session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService); + List pairs = + Arrays.asList( + new BytePairWrapper( + primaryKey.getBytes(StandardCharsets.UTF_8), + primaryKey.getBytes(StandardCharsets.UTF_8))); + twoPhaseCommitter.prewriteSecondaryKeys( + primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000); + BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000); + + long commitTs = session.getTimestamp().getVersion(); + + twoPhaseCommitter.commitPrimaryKey( + backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs); + } catch (Exception e) { + logger.error(e.getMessage(), e); + failCount.incrementAndGet(); + } finally { + latch.countDown(); + } + }) + .start(); + + Thread.sleep(10); + new Thread( + () -> { + long startTS = session.getTimestamp().getVersion(); + try { + TwoPhaseCommitter twoPhaseCommitter = + new TwoPhaseCommitter( + session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService); + List pairs = + Arrays.asList( + new BytePairWrapper( + primaryKey.getBytes(StandardCharsets.UTF_8), + primaryKey.getBytes(StandardCharsets.UTF_8))); + twoPhaseCommitter.prewriteSecondaryKeys( + primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000); + BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000); + + long commitTs = session.getTimestamp().getVersion(); + + twoPhaseCommitter.commitPrimaryKey( + backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs); + } catch (Exception e) { + logger.error(e.getMessage(), e); + failCount.incrementAndGet(); + } finally { + latch.countDown(); + } + }) + .start(); + latch.await(); + Assert.assertEquals(1, failCount.get()); + } }