Skip to content

Commit

Permalink
fix KeyException no retry & no detail error msg(tikv#778)
Browse files Browse the repository at this point in the history
  • Loading branch information
b.tian committed Jan 18, 2024
1 parent aedfd66 commit 2be0907
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 4 deletions.
12 changes: 12 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,18 @@
<artifactId>simpleclient_pushgateway</artifactId>
<version>0.10.0</version>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>2.0.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
<version>2.0.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<reporting>
<plugins>
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/tikv/common/exception/KeyException.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public KeyException(String errMsg) {
}

public KeyException(Kvrpcpb.KeyError keyErr) {
super("Key exception occurred");
super("Key exception occurred " + keyErr.toString());
this.keyErr = keyErr;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ private boolean isPrewriteSuccess(BackOffer backOffer, PrewriteResponse resp, lo
Lock lock = new Lock(err.getLocked(), codec);
locks.add(lock);
} else {
throw new KeyException(err.toString());
throw new KeyException(err);
}
}
if (isSuccess) {
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/org/tikv/txn/TxnKVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.tikv.common.ReadOnlyPDClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.KeyException;
import org.tikv.common.exception.RegionException;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.exception.TiKVException;
Expand Down Expand Up @@ -181,7 +180,6 @@ public ClientRPCResult commit(
// TODO: check this logic to see are we satisfied?
private boolean retryableException(Exception e) {
return e instanceof TiClientInternalException
|| e instanceof KeyException
|| e instanceof RegionException
|| e instanceof StatusRuntimeException;
}
Expand Down
189 changes: 189 additions & 0 deletions src/test/java/org/tikv/txn/TwoPhaseCommitterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,40 @@
import static org.junit.Assert.fail;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.BaseTxnKVTest;
import org.tikv.common.BytePairWrapper;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.exception.KeyException;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;

@RunWith(PowerMockRunner.class)
@PrepareForTest({TiSession.class, TxnKVClient.class, TwoPhaseCommitter.class})
@PowerMockIgnore({"javax.net.ssl.*"})
public class TwoPhaseCommitterTest extends BaseTxnKVTest {

private static final Logger logger = LoggerFactory.getLogger(TwoPhaseCommitterTest.class);
private static final int WRITE_BUFFER_SIZE = 32 * 1024;
private static final int TXN_COMMIT_BATCH_SIZE = 768 * 1024;
private static final long DEFAULT_BATCH_WRITE_LOCK_TTL = 3600000;
Expand Down Expand Up @@ -76,4 +99,170 @@ public void autoClosableTest() throws Exception {
executorService)) {}
Assert.assertTrue(executorService.isShutdown());
}

@Test
public void prewriteWriteConflictFastFailTest() throws Exception {

int WRITE_BUFFER_SIZE = 32;
String primaryKey = RandomStringUtils.randomAlphabetic(3);
AtomicLong failCount = new AtomicLong();
ExecutorService executorService =
Executors.newFixedThreadPool(
WRITE_BUFFER_SIZE,
new ThreadFactoryBuilder().setNameFormat("2pc-pool-%d").setDaemon(true).build());
CountDownLatch latch = new CountDownLatch(2);
int DEFAULT_BATCH_WRITE_LOCK_TTL = 10000;
new Thread(
() -> {
long startTS = session.getTimestamp().getVersion();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
TwoPhaseCommitter twoPhaseCommitter =
new TwoPhaseCommitter(
session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService);
List<BytePairWrapper> pairs =
Arrays.asList(
new BytePairWrapper(
primaryKey.getBytes(StandardCharsets.UTF_8),
primaryKey.getBytes(StandardCharsets.UTF_8)));
twoPhaseCommitter.prewriteSecondaryKeys(
primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000);
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000);

long commitTs = session.getTimestamp().getVersion();

twoPhaseCommitter.commitPrimaryKey(
backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs);
} catch (Exception e) {
logger.error(e.getMessage(), e);
failCount.incrementAndGet();
} finally {
latch.countDown();
}
})
.start();

Thread.sleep(10);
new Thread(
() -> {
long startTS = session.getTimestamp().getVersion();
try {
TwoPhaseCommitter twoPhaseCommitter =
new TwoPhaseCommitter(
session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService);
List<BytePairWrapper> pairs =
Arrays.asList(
new BytePairWrapper(
primaryKey.getBytes(StandardCharsets.UTF_8),
primaryKey.getBytes(StandardCharsets.UTF_8)));
twoPhaseCommitter.prewriteSecondaryKeys(
primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000);
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000);

long commitTs = session.getTimestamp().getVersion();

twoPhaseCommitter.commitPrimaryKey(
backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs);
} catch (Exception e) {
logger.error(e.getMessage(), e);
failCount.incrementAndGet();
} finally {
latch.countDown();
}
})
.start();
latch.await();
Assert.assertEquals(1, failCount.get());
}

@Test
public void prewriteWriteConflictLongNoFailTest() throws Exception {

int WRITE_BUFFER_SIZE = 32;
String primaryKey = RandomStringUtils.randomAlphabetic(3);
AtomicLong failCount = new AtomicLong();
ExecutorService executorService =
Executors.newFixedThreadPool(
WRITE_BUFFER_SIZE,
new ThreadFactoryBuilder().setNameFormat("2pc-pool-%d").setDaemon(true).build());
CountDownLatch latch = new CountDownLatch(2);
int DEFAULT_BATCH_WRITE_LOCK_TTL = 10000;

new Thread(
() -> {
long startTS = session.getTimestamp().getVersion();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

try {
session = PowerMockito.spy(session);
TxnKVClient kvClient = PowerMockito.spy(session.createTxnClient());
PowerMockito.when(kvClient, "retryableException", Mockito.any(KeyException.class))
.thenReturn(true);
PowerMockito.doReturn(kvClient).when(session).createTxnClient();

TwoPhaseCommitter twoPhaseCommitter =
new TwoPhaseCommitter(
session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService);
List<BytePairWrapper> pairs =
Arrays.asList(
new BytePairWrapper(
primaryKey.getBytes(StandardCharsets.UTF_8),
primaryKey.getBytes(StandardCharsets.UTF_8)));
twoPhaseCommitter.prewriteSecondaryKeys(
primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000);
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000);

long commitTs = session.getTimestamp().getVersion();

twoPhaseCommitter.commitPrimaryKey(
backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs);
} catch (Exception e) {
logger.error(e.getMessage(), e);
failCount.incrementAndGet();
} finally {
latch.countDown();
}
})
.start();

Thread.sleep(10);
new Thread(
() -> {
long startTS = session.getTimestamp().getVersion();
try {
TwoPhaseCommitter twoPhaseCommitter =
new TwoPhaseCommitter(
session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService);
List<BytePairWrapper> pairs =
Arrays.asList(
new BytePairWrapper(
primaryKey.getBytes(StandardCharsets.UTF_8),
primaryKey.getBytes(StandardCharsets.UTF_8)));
twoPhaseCommitter.prewriteSecondaryKeys(
primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000);
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000);

long commitTs = session.getTimestamp().getVersion();

twoPhaseCommitter.commitPrimaryKey(
backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs);
} catch (Exception e) {
logger.error(e.getMessage(), e);
failCount.incrementAndGet();
} finally {
latch.countDown();
}
})
.start();
latch.await();
Assert.assertEquals(1, failCount.get());
}
}

0 comments on commit 2be0907

Please sign in to comment.