Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

refactor: use scanner to refactor range operation(get&delete) #151

Draft
wants to merge 18 commits into
base: master
Choose a base branch
from
3 changes: 2 additions & 1 deletion dev-support/findbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
<Package name="com.xiaomi.infra.pegasus.client"/>
<Package name="com.xiaomi.infra.pegasus.metrics"/>
<Package name="com.xiaomi.infra.pegasus.tools"/>
<Package name="com.xiaomi.infra.pegasus.client.request"/>
<Package name="com.xiaomi.infra.pegasus.client.request.batch"/>
<Package name="com.xiaomi.infra.pegasus.client.request.range"/>
<Package name="com.xiaomi.infra.pegasus.example"/>
</Or>
</Match>
Expand Down
3 changes: 2 additions & 1 deletion scripts/format-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ PROJECT_DIR=$(dirname "${SCRIPT_DIR}")
cd "${PROJECT_DIR}" || exit 1

SRC_FILES=(src/main/java/com/xiaomi/infra/pegasus/client/*.java
src/main/java/com/xiaomi/infra/pegasus/client/request/batch/*.java
src/main/java/com/xiaomi/infra/pegasus/client/request/range/*.java
src/main/java/com/xiaomi/infra/pegasus/metrics/*.java
src/main/java/com/xiaomi/infra/pegasus/rpc/*.java
src/main/java/com/xiaomi/infra/pegasus/rpc/async/*.java
src/main/java/com/xiaomi/infra/pegasus/operator/*.java
src/main/java/com/xiaomi/infra/pegasus/tools/*.java
src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/*.java
src/main/java/com/xiaomi/infra/pegasus/client/request/*.java
src/main/java/com/xiaomi/infra/pegasus/base/*.java
src/main/java/com/xiaomi/infra/pegasus/example/*.java
src/main/java/com/xiaomi/infra/pegasus/security/*.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.client;

import com.xiaomi.infra.pegasus.client.request.batch.Batch;
import com.xiaomi.infra.pegasus.client.request.batch.BatchWithResponse;
import java.util.*;
import org.apache.commons.lang3.tuple.Pair;

Expand Down Expand Up @@ -111,8 +113,7 @@ public PegasusTableInterface openTable(String tableName, TableOptions tableOptio
* Batch get values of different keys. Will terminate immediately if any error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commit(List, List)}
* more. The latest batch operation please see {@link BatchWithResponse#commit(List, List)}
* @param tableName table name
* @param keys hashKey and sortKey pair list.
* @param values output values; should be created by caller; if succeed, the size of values will
Expand All @@ -131,8 +132,7 @@ public void batchGet(String tableName, List<Pair<byte[], byte[]>> keys, List<byt
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commitWaitAllComplete(List,
* List)}
* BatchWithResponse#commitWaitAllComplete(List, List)}
* @param tableName table name
* @param keys hashKey and sortKey pair list.
* @param results output results; should be created by caller; after call done, the size of
Expand Down Expand Up @@ -182,6 +182,8 @@ public boolean multiGet(
/**
* Get multiple key-values under the same hashKey with sortKey range limited.
*
* @deprecated The API may can't get all records, please use {@linkplain
* com.xiaomi.infra.pegasus.client.request.range.GetRange}
* @param tableName table name
* @param hashKey used to decide which partition the key may exist should not be null or empty.
* @param startSortKey the start sort key. null means "".
Expand All @@ -195,6 +197,7 @@ public boolean multiGet(
* @return true if all data is fetched; false if only partial data is fetched.
* @throws PException throws exception if any error occurs.
*/
@Deprecated
public boolean multiGet(
String tableName,
byte[] hashKey,
Expand All @@ -206,6 +209,7 @@ public boolean multiGet(
List<Pair<byte[], byte[]>> values)
throws PException;

@Deprecated
public boolean multiGet(
String tableName,
byte[] hashKey,
Expand All @@ -220,8 +224,7 @@ public boolean multiGet(
* occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commit(List, List)}
* more. The latest batch operation please see {@link BatchWithResponse#commit(List, List)}
* @param tableName table name
* @param keys List{hashKey,List{sortKey}}; if List{sortKey} is null or empty, means fetch all
* sortKeys under the hashKey.
Expand All @@ -242,8 +245,7 @@ public void batchMultiGet(
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commitWaitAllComplete(List,
* List)}
* BatchWithResponse#commitWaitAllComplete(List, List)}
* @param tableName table name
* @param keys List{hashKey,List{sortKey}}; if List{sortKey} is null or empty, means fetch all
* sortKeys under the hashKey.
Expand Down Expand Up @@ -307,8 +309,7 @@ public void set(String tableName, byte[] hashKey, byte[] sortKey, byte[] value,
* Batch set lots of values. Will terminate immediately if any error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commit(List)}
* more. The latest batch operation please see {@link Batch#commit(List)}
* @param tableName TableHandler name
* @param items list of items.
* @throws PException throws exception if any error occurs.
Expand All @@ -322,8 +323,7 @@ public void set(String tableName, byte[] hashKey, byte[] sortKey, byte[] value,
* Batch set lots of values. Will wait for all requests done even if some error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)}
* more. The latest batch operation please see {@link Batch#commitWaitAllComplete(List, List)}
* @param tableName table name
* @param items list of items.
* @param results output results; should be created by caller; after call done, the size of
Expand Down Expand Up @@ -360,8 +360,7 @@ public void multiSet(String tableName, byte[] hashKey, List<Pair<byte[], byte[]>
* occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commit(List)}
* more. The latest batch operation please see {@link Batch#commit(List)}
* @param tableName TableHandler name
* @param items list of items.
* @param ttlSeconds time to live in seconds, 0 means no ttl. default value is 0.
Expand All @@ -381,8 +380,7 @@ public void batchMultiSet(String tableName, List<HashKeyData> items, int ttlSeco
* error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)}
* more. The latest batch operation please see {@link Batch#commitWaitAllComplete(List, List)}
* @param tableName table name
* @param items list of items.
* @param ttlSeconds time to live in seconds, 0 means no ttl. default value is 0.
Expand Down Expand Up @@ -420,8 +418,7 @@ public int batchMultiSet2(String tableName, List<HashKeyData> items, List<PExcep
* Batch delete values of different keys. Will terminate immediately if any error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)}
* more. The latest batch operation please see {@link Batch#commitWaitAllComplete(List, List)}
* @param tableName table name
* @param keys hashKey and sortKey pair list.
* @throws PException throws exception if any error occurs.
Expand All @@ -436,8 +433,8 @@ public int batchMultiSet2(String tableName, List<HashKeyData> items, List<PExcep
* occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. TThe latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)}
* more. TThe latest batch operation please see {@link Batch#commitWaitAllComplete(List,
* List)}
* @param tableName table name
* @param keys hashKey and sortKey pair list.
* @param results output results; should be created by caller; after call done, the size of
Expand Down Expand Up @@ -467,13 +464,16 @@ public int batchDel2(String tableName, List<Pair<byte[], byte[]>> keys, List<PEx
* Delete key-values within range of startSortKey and stopSortKey under hashKey. Will terminate
* immediately if any error occurs.
*
* @deprecated the latest usage please see {@linkplain
* com.xiaomi.infra.pegasus.client.request.range.DeleteRange}
* @param tableName table name
* @param hashKey used to decide which partition the key may exist should not be null or empty.
* @param startSortKey the start sort key. null means "".
* @param stopSortKey the stop sort key. null or "" means fetch to the last sort key.
* @param options del range options.
* @throws PException throws exception if any error occurs.
*/
@Deprecated
public void delRange(
String tableName,
byte[] hashKey,
Expand All @@ -487,8 +487,7 @@ public void delRange(
* error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commit(List)}
* more. The latest batch operation please see {@link Batch#commit(List)}
* @param tableName table name
* @param keys List{hashKey,List{sortKey}}
* @throws PException throws exception if any error occurs.
Expand All @@ -504,8 +503,7 @@ public void batchMultiDel(String tableName, List<Pair<byte[], List<byte[]>>> key
* if some error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)}
* more. The latest batch operation please see {@link Batch#commitWaitAllComplete(List, List)}
* @param tableName table name
* @param keys List{hashKey,List{sortKey}}
* @param results output results; should be created by caller; after call done, the size of
Expand Down
83 changes: 12 additions & 71 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.xiaomi.infra.pegasus.base.blob;
import com.xiaomi.infra.pegasus.base.error_code;
import com.xiaomi.infra.pegasus.base.gpid;
import com.xiaomi.infra.pegasus.client.request.range.GetRange;
import com.xiaomi.infra.pegasus.operator.*;
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import com.xiaomi.infra.pegasus.rpc.Table;
Expand Down Expand Up @@ -43,6 +44,10 @@ public PegasusTable(PegasusClient client, Table table) {
this.metaList = client.getMetaList();
}

public int getDefaultTimeout() {
return defaultTimeout;
}

@Override
public Future<Boolean> asyncExist(byte[] hashKey, byte[] sortKey, int timeout) {
final DefaultPromise<Boolean> promise = table.newPromise();
Expand Down Expand Up @@ -1173,16 +1178,13 @@ public int batchMultiGet2(
public MultiGetSortKeysResult multiGetSortKeys(
byte[] hashKey, int maxFetchCount, int maxFetchSize, int timeout) throws PException {
if (timeout <= 0) timeout = defaultTimeout;
MultiGetSortKeysResult sortKeysResult = new MultiGetSortKeysResult();
sortKeysResult.keys = new ArrayList<>();
ScanOptions options = new ScanOptions();
options.noValue = true;
ScanRangeResult result = scanRange(hashKey, null, null, options, maxFetchCount, timeout);
for (Pair<Pair<byte[], byte[]>, byte[]> pair : result.results) {
sortKeysResult.keys.add(pair.getLeft().getValue());
}
sortKeysResult.allFetched = result.allFetched;
return sortKeysResult;
ScanOptions scanOptions = new ScanOptions();
scanOptions.noValue = true;
GetRange getRange = new GetRange(this, hashKey, timeout);
return getRange
.withOptions(scanOptions)
.commitAndWait(maxFetchCount)
.convertMultiGetSortKeysResult();
}

@Override
Expand Down Expand Up @@ -1818,67 +1820,6 @@ public List<PegasusScannerInterface> getUnorderedScanners(
return ret;
}

/**
* {@linkplain #scanRange(byte[], byte[], byte[], ScanOptions, int, int)} result, if fetch all
* data for {startSortKey, stopSortKey}, ScanRangeResult.allFetched=true
*/
static class ScanRangeResult {
public List<Pair<Pair<byte[], byte[]>, byte[]>> results;
public boolean allFetched;
}

/**
* get scan result for {startSortKey, stopSortKey} within hashKey
*
* @param hashKey used to decide which partition to put this k-v,
* @param startSortKey start sort key scan from if null or length == 0, means start from begin
* @param stopSortKey stop sort key scan to if null or length == 0, means stop to end
* @param options scan options like endpoint inclusive/exclusive
* @param maxFetchCount max count of k-v pairs to be fetched. if <=0 means fetch all data for
* {startSortKey, stopSortKey}
* @param timeout if exceed the timeout will throw timeout exception, if <=0, it is equal with
* "timeout" of config
* @return ScanRangeResult result{pair((hashKey, sortKey), value}, if fetch all data for
* {startSortKey, stopSortKey}, ScanRangeResult.allFetched=true
* @throws PException
*/
ScanRangeResult scanRange(
byte[] hashKey,
byte[] startSortKey,
byte[] stopSortKey,
ScanOptions options,
int maxFetchCount,
int timeout /*ms*/)
throws PException {
if (timeout <= 0) timeout = defaultTimeout;
long deadlineTime = System.currentTimeMillis() + timeout;

PegasusScannerInterface pegasusScanner =
getScanner(hashKey, startSortKey, stopSortKey, options);
ScanRangeResult scanRangeResult = new ScanRangeResult();
scanRangeResult.allFetched = false;
scanRangeResult.results = new ArrayList<>();
if (System.currentTimeMillis() >= deadlineTime) {
throw PException.timeout(
metaList, table.getTableName(), new Request(hashKey), timeout, new TimeoutException());
}

Pair<Pair<byte[], byte[]>, byte[]> pair;
while ((pair = pegasusScanner.next()) != null
&& (maxFetchCount <= 0 || scanRangeResult.results.size() < maxFetchCount)) {
if (System.currentTimeMillis() >= deadlineTime) {
throw PException.timeout(
metaList, table.getTableName(), new Request(hashKey), timeout, new TimeoutException());
}
scanRangeResult.results.add(pair);
}

if (pegasusScanner.next() == null) {
scanRangeResult.allFetched = true;
}
return scanRangeResult;
}

public void handleReplicaException(
Request request, DefaultPromise promise, client_operator op, Table table, int timeout) {
if (timeout <= 0) timeout = defaultTimeout;
Expand Down
Loading