Skip to content

Commit

Permalink
[close #540] rawkv: fix scan return empty set while exist empty key (#…
Browse files Browse the repository at this point in the history
…541) (#548)

* rawkv: cherry-pick fix-scan-empty-key

Signed-off-by: iosmanthus <[email protected]>

* rawkv: fix missing parameters

Signed-off-by: iosmanthus <[email protected]>
  • Loading branch information
iosmanthus authored Mar 1, 2022
1 parent fdb67bc commit e72ba8b
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.tikv.kvproto.Kvrpcpb;

public class RawScanIterator extends ScanIterator {

private final BackOffer scanBackOffer;

public RawScanIterator(
Expand Down Expand Up @@ -65,11 +66,12 @@ TiRegion loadCurrentRegionToCache() throws GrpcException {
}
}

private boolean notEndOfScan() {
return limit > 0
&& !(processingLastBatch
&& (index >= currentCache.size()
|| Key.toRawKey(currentCache.get(index).getKey()).compareTo(endKey) >= 0));
private boolean endOfScan() {
if (!processingLastBatch) {
return false;
}
ByteString lastKey = currentCache.get(index).getKey();
return !lastKey.isEmpty() && Key.toRawKey(lastKey).compareTo(endKey) >= 0;
}

boolean isCacheDrained() {
Expand All @@ -88,7 +90,7 @@ public boolean hasNext() {
return false;
}
}
return notEndOfScan();
return !endOfScan();
}

private Kvrpcpb.KvPair getCurrent() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ boolean cacheLoadFails() {
Key lastKey = Key.EMPTY;
// Session should be single-threaded itself
// so that we don't worry about conf change in the middle
// of a transaction. Otherwise below code might lose data
// of a transaction. Otherwise, below code might lose data
if (currentCache.size() < limit) {
startKey = curRegionEndKey;
lastKey = Key.toRawKey(curRegionEndKey);
Expand Down
64 changes: 60 additions & 4 deletions src/test/java/org/tikv/raw/RawKVClientTest.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,32 @@
package org.tikv.raw;

import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
Expand All @@ -28,6 +46,7 @@
import org.tikv.kvproto.Kvrpcpb;

public class RawKVClientTest extends BaseRawKVTest {

private static final String RAW_PREFIX = "raw_\u0001_";
private static final int KEY_POOL_SIZE = 1000000;
private static final int TEST_CASES = 10000;
Expand Down Expand Up @@ -360,6 +379,40 @@ private List<Kvrpcpb.KvPair> rawKeys() {
return client.scan(RAW_START_KEY, RAW_END_KEY);
}

@Test
public void scanTestForIssue540() {
ByteString splitKeyA = ByteString.copyFromUtf8("splitKeyA");
ByteString splitKeyB = ByteString.copyFromUtf8("splitKeyB");
int splitRegionBackoffMS = 12000;
int scatterRegionBackoffMS = 30000;
int scatterWaitMS = 300000;
session.splitRegionAndScatter(
ImmutableList.of(splitKeyA.toByteArray(), splitKeyB.toByteArray()),
splitRegionBackoffMS,
scatterRegionBackoffMS,
scatterWaitMS);
client.deleteRange(ByteString.EMPTY, ByteString.EMPTY);

client.put(ByteString.EMPTY, ByteString.EMPTY);
client.put(splitKeyA, ByteString.EMPTY);
Assert.assertEquals(0, client.scan(ByteString.EMPTY, 0).size());
Assert.assertEquals(1, client.scan(ByteString.EMPTY, 1).size());
Assert.assertEquals(2, client.scan(ByteString.EMPTY, 2).size());
Assert.assertEquals(2, client.scan(ByteString.EMPTY, 3).size());

client.deleteRange(ByteString.EMPTY, ByteString.EMPTY);

client.put(ByteString.EMPTY, ByteString.EMPTY);
client.put(splitKeyA, ByteString.EMPTY);
client.put(splitKeyA.concat(ByteString.copyFromUtf8("1")), ByteString.EMPTY);
client.put(splitKeyA.concat(ByteString.copyFromUtf8("2")), ByteString.EMPTY);
client.put(splitKeyA.concat(ByteString.copyFromUtf8("3")), ByteString.EMPTY);
client.put(splitKeyB.concat(ByteString.copyFromUtf8("1")), ByteString.EMPTY);
Assert.assertEquals(6, client.scan(ByteString.EMPTY, 7).size());
Assert.assertEquals(0, client.scan(ByteString.EMPTY, -1).size());
client.deleteRange(ByteString.EMPTY, ByteString.EMPTY);
}

@Test
public void validate() {
baseTest(100, 100, 100, 100, false, false, false, false, false);
Expand Down Expand Up @@ -449,7 +502,9 @@ private void prepare() {
int i = cnt;
completionService.submit(
() -> {
for (int j = 0; j < base; j++) checkDelete(remainingKeys.get(i * base + j).getKey());
for (int j = 0; j < base; j++) {
checkDelete(remainingKeys.get(i * base + j).getKey());
}
return null;
});
}
Expand Down Expand Up @@ -942,6 +997,7 @@ private static ByteString rawValue(String value) {
}

private static class ByteStringComparator implements Comparator<ByteString> {

@Override
public int compare(ByteString startKey, ByteString endKey) {
return FastByteComparisons.compareTo(startKey.toByteArray(), endKey.toByteArray());
Expand Down

0 comments on commit e72ba8b

Please sign in to comment.