Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

refactor: refactor getRange operation using scanner #154

Draft
wants to merge 20 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dev-support/findbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
<Package name="com.xiaomi.infra.pegasus.client"/>
<Package name="com.xiaomi.infra.pegasus.metrics"/>
<Package name="com.xiaomi.infra.pegasus.tools"/>
<Package name="com.xiaomi.infra.pegasus.client.request"/>
<Package name="com.xiaomi.infra.pegasus.client.request.batch"/>
<Package name="com.xiaomi.infra.pegasus.client.request.range"/>
<Package name="com.xiaomi.infra.pegasus.example"/>
</Or>
</Match>
Expand Down
3 changes: 2 additions & 1 deletion scripts/format-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ PROJECT_DIR=$(dirname "${SCRIPT_DIR}")
cd "${PROJECT_DIR}" || exit 1

SRC_FILES=(src/main/java/com/xiaomi/infra/pegasus/client/*.java
src/main/java/com/xiaomi/infra/pegasus/client/request/batch/*.java
src/main/java/com/xiaomi/infra/pegasus/client/request/range/*.java
src/main/java/com/xiaomi/infra/pegasus/metrics/*.java
src/main/java/com/xiaomi/infra/pegasus/rpc/*.java
src/main/java/com/xiaomi/infra/pegasus/rpc/async/*.java
src/main/java/com/xiaomi/infra/pegasus/operator/*.java
src/main/java/com/xiaomi/infra/pegasus/tools/*.java
src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/*.java
src/main/java/com/xiaomi/infra/pegasus/client/request/*.java
src/main/java/com/xiaomi/infra/pegasus/base/*.java
src/main/java/com/xiaomi/infra/pegasus/example/*.java
src/main/java/com/xiaomi/infra/pegasus/security/*.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.client;

import com.xiaomi.infra.pegasus.client.request.batch.Batch;
import com.xiaomi.infra.pegasus.client.request.batch.BatchWithResponse;
import java.util.*;
import org.apache.commons.lang3.tuple.Pair;

Expand Down Expand Up @@ -111,8 +113,7 @@ public PegasusTableInterface openTable(String tableName, TableOptions tableOptio
* Batch get values of different keys. Will terminate immediately if any error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commit(List, List)}
* more. The latest batch operation please see {@link BatchWithResponse#commit(List, List)}
* @param tableName table name
* @param keys hashKey and sortKey pair list.
* @param values output values; should be created by caller; if succeed, the size of values will
Expand All @@ -131,8 +132,7 @@ public void batchGet(String tableName, List<Pair<byte[], byte[]>> keys, List<byt
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commitWaitAllComplete(List,
* List)}
* BatchWithResponse#commitWaitAllComplete(List, List)}
* @param tableName table name
* @param keys hashKey and sortKey pair list.
* @param results output results; should be created by caller; after call done, the size of
Expand Down Expand Up @@ -182,6 +182,8 @@ public boolean multiGet(
/**
* Get multiple key-values under the same hashKey with sortKey range limited.
*
* @deprecated The API may can't get all records, please use {@linkplain
* com.xiaomi.infra.pegasus.client.request.range.GetRange}
* @param tableName table name
* @param hashKey used to decide which partition the key may exist should not be null or empty.
* @param startSortKey the start sort key. null means "".
Expand All @@ -195,6 +197,7 @@ public boolean multiGet(
* @return true if all data is fetched; false if only partial data is fetched.
* @throws PException throws exception if any error occurs.
*/
@Deprecated
public boolean multiGet(
String tableName,
byte[] hashKey,
Expand All @@ -206,6 +209,7 @@ public boolean multiGet(
List<Pair<byte[], byte[]>> values)
throws PException;

@Deprecated
public boolean multiGet(
String tableName,
byte[] hashKey,
Expand All @@ -220,8 +224,7 @@ public boolean multiGet(
* occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commit(List, List)}
* more. The latest batch operation please see {@link BatchWithResponse#commit(List, List)}
* @param tableName table name
* @param keys List{hashKey,List{sortKey}}; if List{sortKey} is null or empty, means fetch all
* sortKeys under the hashKey.
Expand All @@ -242,8 +245,7 @@ public void batchMultiGet(
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commitWaitAllComplete(List,
* List)}
* BatchWithResponse#commitWaitAllComplete(List, List)}
* @param tableName table name
* @param keys List{hashKey,List{sortKey}}; if List{sortKey} is null or empty, means fetch all
* sortKeys under the hashKey.
Expand Down Expand Up @@ -307,8 +309,7 @@ public void set(String tableName, byte[] hashKey, byte[] sortKey, byte[] value,
* Batch set lots of values. Will terminate immediately if any error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commit(List)}
* more. The latest batch operation please see {@link Batch#commit(List)}
* @param tableName TableHandler name
* @param items list of items.
* @throws PException throws exception if any error occurs.
Expand All @@ -322,8 +323,7 @@ public void set(String tableName, byte[] hashKey, byte[] sortKey, byte[] value,
* Batch set lots of values. Will wait for all requests done even if some error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)}
* more. The latest batch operation please see {@link Batch#commitWaitAllComplete(List, List)}
* @param tableName table name
* @param items list of items.
* @param results output results; should be created by caller; after call done, the size of
Expand Down Expand Up @@ -360,8 +360,7 @@ public void multiSet(String tableName, byte[] hashKey, List<Pair<byte[], byte[]>
* occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commit(List)}
* more. The latest batch operation please see {@link Batch#commit(List)}
* @param tableName TableHandler name
* @param items list of items.
* @param ttlSeconds time to live in seconds, 0 means no ttl. default value is 0.
Expand All @@ -381,8 +380,7 @@ public void batchMultiSet(String tableName, List<HashKeyData> items, int ttlSeco
* error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)}
* more. The latest batch operation please see {@link Batch#commitWaitAllComplete(List, List)}
* @param tableName table name
* @param items list of items.
* @param ttlSeconds time to live in seconds, 0 means no ttl. default value is 0.
Expand Down Expand Up @@ -420,8 +418,7 @@ public int batchMultiSet2(String tableName, List<HashKeyData> items, List<PExcep
* Batch delete values of different keys. Will terminate immediately if any error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)}
* more. The latest batch operation please see {@link Batch#commitWaitAllComplete(List, List)}
* @param tableName table name
* @param keys hashKey and sortKey pair list.
* @throws PException throws exception if any error occurs.
Expand All @@ -436,8 +433,8 @@ public int batchMultiSet2(String tableName, List<HashKeyData> items, List<PExcep
* occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. TThe latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)}
* more. TThe latest batch operation please see {@link Batch#commitWaitAllComplete(List,
* List)}
* @param tableName table name
* @param keys hashKey and sortKey pair list.
* @param results output results; should be created by caller; after call done, the size of
Expand Down Expand Up @@ -467,13 +464,16 @@ public int batchDel2(String tableName, List<Pair<byte[], byte[]>> keys, List<PEx
* Delete key-values within range of startSortKey and stopSortKey under hashKey. Will terminate
* immediately if any error occurs.
*
* @deprecated the latest usage please see {@linkplain
* com.xiaomi.infra.pegasus.client.request.range.DeleteRange}
* @param tableName table name
* @param hashKey used to decide which partition the key may exist should not be null or empty.
* @param startSortKey the start sort key. null means "".
* @param stopSortKey the stop sort key. null or "" means fetch to the last sort key.
* @param options del range options.
* @throws PException throws exception if any error occurs.
*/
@Deprecated
public void delRange(
String tableName,
byte[] hashKey,
Expand All @@ -487,8 +487,7 @@ public void delRange(
* error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commit(List)}
* more. The latest batch operation please see {@link Batch#commit(List)}
* @param tableName table name
* @param keys List{hashKey,List{sortKey}}
* @throws PException throws exception if any error occurs.
Expand All @@ -504,8 +503,7 @@ public void batchMultiDel(String tableName, List<Pair<byte[], List<byte[]>>> key
* if some error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)}
* more. The latest batch operation please see {@link Batch#commitWaitAllComplete(List, List)}
* @param tableName table name
* @param keys List{hashKey,List{sortKey}}
* @param results output results; should be created by caller; after call done, the size of
Expand Down
83 changes: 12 additions & 71 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.xiaomi.infra.pegasus.base.blob;
import com.xiaomi.infra.pegasus.base.error_code;
import com.xiaomi.infra.pegasus.base.gpid;
import com.xiaomi.infra.pegasus.client.request.range.GetRange;
import com.xiaomi.infra.pegasus.operator.*;
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import com.xiaomi.infra.pegasus.rpc.Table;
Expand Down Expand Up @@ -43,6 +44,10 @@ public PegasusTable(PegasusClient client, Table table) {
this.metaList = client.getMetaList();
}

public int getDefaultTimeout() {
return defaultTimeout;
}

@Override
public Future<Boolean> asyncExist(byte[] hashKey, byte[] sortKey, int timeout) {
final DefaultPromise<Boolean> promise = table.newPromise();
Expand Down Expand Up @@ -1173,16 +1178,13 @@ public int batchMultiGet2(
public MultiGetSortKeysResult multiGetSortKeys(
byte[] hashKey, int maxFetchCount, int maxFetchSize, int timeout) throws PException {
if (timeout <= 0) timeout = defaultTimeout;
MultiGetSortKeysResult sortKeysResult = new MultiGetSortKeysResult();
sortKeysResult.keys = new ArrayList<>();
ScanOptions options = new ScanOptions();
options.noValue = true;
ScanRangeResult result = scanRange(hashKey, null, null, options, maxFetchCount, timeout);
for (Pair<Pair<byte[], byte[]>, byte[]> pair : result.results) {
sortKeysResult.keys.add(pair.getLeft().getValue());
}
sortKeysResult.allFetched = result.allFetched;
return sortKeysResult;
ScanOptions scanOptions = new ScanOptions();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we reserver the old interface and add a new interface for these code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, if like multiGet, origin multiGetSortKeys should also be retained and only marked Deprecated. but the API has been refactored by scan in previous PR and some user has use it, here I just keep it. @neverchanje @hycdong can give some suggestion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is okay to update code in function multiGetSortKeys, because it is already implemented by scan.

scanOptions.noValue = true;
GetRange getRange = new GetRange(this, hashKey, timeout);
return getRange
.withOptions(scanOptions)
.commitAndWait(maxFetchCount)
.convertMultiGetSortKeysResult();
}

@Override
Expand Down Expand Up @@ -1818,67 +1820,6 @@ public List<PegasusScannerInterface> getUnorderedScanners(
return ret;
}

/**
* {@linkplain #scanRange(byte[], byte[], byte[], ScanOptions, int, int)} result, if fetch all
* data for {startSortKey, stopSortKey}, ScanRangeResult.allFetched=true
*/
static class ScanRangeResult {
public List<Pair<Pair<byte[], byte[]>, byte[]>> results;
public boolean allFetched;
}

/**
* get scan result for {startSortKey, stopSortKey} within hashKey
*
* @param hashKey used to decide which partition to put this k-v,
* @param startSortKey start sort key scan from if null or length == 0, means start from begin
* @param stopSortKey stop sort key scan to if null or length == 0, means stop to end
* @param options scan options like endpoint inclusive/exclusive
* @param maxFetchCount max count of k-v pairs to be fetched. if <=0 means fetch all data for
* {startSortKey, stopSortKey}
* @param timeout if exceed the timeout will throw timeout exception, if <=0, it is equal with
* "timeout" of config
* @return ScanRangeResult result{pair((hashKey, sortKey), value}, if fetch all data for
* {startSortKey, stopSortKey}, ScanRangeResult.allFetched=true
* @throws PException
*/
ScanRangeResult scanRange(
byte[] hashKey,
byte[] startSortKey,
byte[] stopSortKey,
ScanOptions options,
int maxFetchCount,
int timeout /*ms*/)
throws PException {
if (timeout <= 0) timeout = defaultTimeout;
long deadlineTime = System.currentTimeMillis() + timeout;

PegasusScannerInterface pegasusScanner =
getScanner(hashKey, startSortKey, stopSortKey, options);
ScanRangeResult scanRangeResult = new ScanRangeResult();
scanRangeResult.allFetched = false;
scanRangeResult.results = new ArrayList<>();
if (System.currentTimeMillis() >= deadlineTime) {
throw PException.timeout(
metaList, table.getTableName(), new Request(hashKey), timeout, new TimeoutException());
}

Pair<Pair<byte[], byte[]>, byte[]> pair;
while ((pair = pegasusScanner.next()) != null
&& (maxFetchCount <= 0 || scanRangeResult.results.size() < maxFetchCount)) {
if (System.currentTimeMillis() >= deadlineTime) {
throw PException.timeout(
metaList, table.getTableName(), new Request(hashKey), timeout, new TimeoutException());
}
scanRangeResult.results.add(pair);
}

if (pegasusScanner.next() == null) {
scanRangeResult.allFetched = true;
}
return scanRangeResult;
}

public void handleReplicaException(
Request request, DefaultPromise promise, client_operator op, Table table, int timeout) {
if (timeout <= 0) timeout = defaultTimeout;
Expand Down
Loading