Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

refactor: refactor getRange operation using scanner #154

Draft
wants to merge 20 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
init
foreverneverer committed Mar 16, 2021
commit 014752d0fcb14df3d17255f09f1e106211d0a1dd
3 changes: 2 additions & 1 deletion scripts/format-all.sh
Original file line number Diff line number Diff line change
@@ -24,13 +24,14 @@ PROJECT_DIR=$(dirname "${SCRIPT_DIR}")
cd "${PROJECT_DIR}" || exit 1

SRC_FILES=(src/main/java/com/xiaomi/infra/pegasus/client/*.java
src/main/java/com/xiaomi/infra/pegasus/client/request/batch/*.java
src/main/java/com/xiaomi/infra/pegasus/client/request/range/*.java
src/main/java/com/xiaomi/infra/pegasus/metrics/*.java
src/main/java/com/xiaomi/infra/pegasus/rpc/*.java
src/main/java/com/xiaomi/infra/pegasus/rpc/async/*.java
src/main/java/com/xiaomi/infra/pegasus/operator/*.java
src/main/java/com/xiaomi/infra/pegasus/tools/*.java
src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/*.java
src/main/java/com/xiaomi/infra/pegasus/client/request/*.java
src/main/java/com/xiaomi/infra/pegasus/base/*.java
src/main/java/com/xiaomi/infra/pegasus/example/*.java
src/main/java/com/xiaomi/infra/pegasus/security/*.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,73 @@
package com.xiaomi.infra.pegasus.client.request.range;

import com.xiaomi.infra.pegasus.client.ScanOptions;
import com.xiaomi.infra.pegasus.client.PException;
import com.xiaomi.infra.pegasus.client.PegasusTable;
import com.xiaomi.infra.pegasus.client.PegasusTableInterface;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.tuple.Pair;

public class DeleteRange extends Scan{
public class DeleteRange extends Range<Boolean> {
public PegasusTableInterface table;
private byte[] nextSortKey;

public DeleteRange(byte[] hashKey, int timeout) {
super(hashKey, timeout);
public DeleteRange(PegasusTableInterface table, byte[] hashKey, int timeout) {
super(hashKey, timeout);
this.table = table;
}

public Boolean commitAndWait(int maxDeleteCount)
throws PException, InterruptedException, ExecutionException, TimeoutException {
this.scanOptions.noValue = true;
ScannerWrapper<Boolean> scannerWrapper = new ScannerWrapper<>(table, this);

if (timeout <= 0) timeout = ((PegasusTable) table).getDefaultTimeout();
long deadlineTime = System.currentTimeMillis() + timeout;
PegasusTable.ScanRangeResult result = scannerWrapper.hashScan(maxDeleteCount, timeout);

if (System.currentTimeMillis() >= deadlineTime) {
throw PException.timeout(
((PegasusTable) table).getMetaList(),
((PegasusTable) table).getTable().getTableName(),
new PegasusTable.Request(hashKey),
timeout,
new TimeoutException());
}

List<byte[]> sortKeys = new ArrayList<>();
int remainingTime = (int) (deadlineTime - System.currentTimeMillis());
for (Pair<Pair<byte[], byte[]>, byte[]> pair : result.results) {
remainingTime = (int) (deadlineTime - System.currentTimeMillis());
sortKeys.add(pair.getKey().getValue());
if (sortKeys.size() == this.scanOptions.batchSize) {
nextSortKey = sortKeys.get(0);
table
.asyncMultiDel(hashKey, sortKeys, remainingTime)
.get(remainingTime, TimeUnit.MILLISECONDS);
if (remainingTime <= 0) {
throw PException.timeout(
((PegasusTable) table).getMetaList(),
((PegasusTable) table).getTable().getTableName(),
new PegasusTable.Request(hashKey),
timeout,
new TimeoutException());
}
sortKeys.clear();
}
}
if (!sortKeys.isEmpty()) {
table
.asyncMultiDel(hashKey, sortKeys, remainingTime)
.get(remainingTime, TimeUnit.MILLISECONDS);
nextSortKey = null;
}
return result.allFetched;
}

public byte[] getNextSortKey() {
return nextSortKey;
}
}
Original file line number Diff line number Diff line change
@@ -3,20 +3,17 @@
import com.xiaomi.infra.pegasus.client.PException;
import com.xiaomi.infra.pegasus.client.PegasusTable;
import com.xiaomi.infra.pegasus.client.PegasusTableInterface;
import com.xiaomi.infra.pegasus.client.ScanOptions;

public class GetRange extends Scan{
public PegasusTableInterface table;
public int maxFetchCount;
public class GetRange extends Range<PegasusTable.ScanRangeResult> {
public PegasusTableInterface table;

public GetRange(PegasusTableInterface table, byte[] hashKey, int maxFetchCount, int timeout) {
super(hashKey, timeout);
this.table = table;
this.maxFetchCount = maxFetchCount;
}
public GetRange(PegasusTableInterface table, byte[] hashKey, int timeout) {
super(hashKey, timeout);
this.table = table;
}

public PegasusTable.ScanRangeResult commitAndWait() throws PException {
ScannerWrapper scannerWrapper = new ScannerWrapper(table);
return scannerWrapper.hashScan(hashKey, startSortKey, stopSortKey, scanOptions, maxFetchCount, timeout);
}
public PegasusTable.ScanRangeResult commitAndWait(int maxFetchCount) throws PException {
ScannerWrapper<PegasusTable.ScanRangeResult> scannerWrapper = new ScannerWrapper<>(table, this);
return scannerWrapper.hashScan(maxFetchCount, timeout);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.xiaomi.infra.pegasus.client.request.range;

import com.xiaomi.infra.pegasus.client.PException;
import com.xiaomi.infra.pegasus.client.ScanOptions;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public abstract class Range<Response> {
protected byte[] hashKey;
protected int timeout;

protected ScanOptions scanOptions = new ScanOptions();
protected byte[] startSortKey;
protected byte[] stopSortKey;

protected Range(byte[] hashKey, int timeout) {
this.hashKey = hashKey;
this.timeout = timeout;
}

protected abstract Response commitAndWait(int maxRangeCount)
throws PException, InterruptedException, ExecutionException, TimeoutException;

protected Range<Response> withOptions(ScanOptions scanOptions) {
this.scanOptions = scanOptions;
return this;
}

protected Range<Response> withStartSortKey(byte[] startSortKey) {
this.startSortKey = startSortKey;
return this;
}

protected Range<Response> withStopSortKey(byte[] stopSortKey) {
this.stopSortKey = stopSortKey;
return this;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,57 +1,56 @@
package com.xiaomi.infra.pegasus.client.request.range;

import com.xiaomi.infra.pegasus.client.*;
import com.xiaomi.infra.pegasus.rpc.Table;
import org.apache.commons.lang3.tuple.Pair;

import java.util.ArrayList;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.tuple.Pair;

public class ScannerWrapper {
private final PegasusTableInterface table;

public ScannerWrapper(PegasusTableInterface table) {
this.table = table;
public class ScannerWrapper<Response> {
private final Range<Response> request;
private final PegasusTableInterface table;
private final PegasusScannerInterface scanner;

public ScannerWrapper(PegasusTableInterface table, Range<Response> request) throws PException {
this.table = table;
this.request = request;
this.scanner =
table.getScanner(
request.hashKey, request.startSortKey, request.stopSortKey, request.scanOptions);
}

PegasusTable.ScanRangeResult hashScan(int maxFetchCount, int timeout /*ms*/) throws PException {
if (timeout <= 0) timeout = ((PegasusTable) table).getDefaultTimeout();
long deadlineTime = System.currentTimeMillis() + timeout;

PegasusTable.ScanRangeResult scanRangeResult = new PegasusTable.ScanRangeResult();
scanRangeResult.allFetched = false;
scanRangeResult.results = new ArrayList<>();
if (System.currentTimeMillis() >= deadlineTime) {
throw PException.timeout(
((PegasusTable) table).getMetaList(),
((PegasusTable) table).getTable().getTableName(),
new PegasusTable.Request(request.hashKey),
timeout,
new TimeoutException());
}

PegasusTable.ScanRangeResult hashScan(
byte[] hashKey,
byte[] startSortKey,
byte[] stopSortKey,
ScanOptions options,
int maxFetchCount,
int timeout /*ms*/)
throws PException {
if (timeout <= 0) timeout = ((PegasusTable)table).getDefaultTimeout();
long deadlineTime = System.currentTimeMillis() + timeout;

PegasusScannerInterface pegasusScanner =
table.getScanner(hashKey, startSortKey, stopSortKey, options);
PegasusTable.ScanRangeResult scanRangeResult = new PegasusTable.ScanRangeResult();
scanRangeResult.allFetched = false;
scanRangeResult.results = new ArrayList<>();
if (System.currentTimeMillis() >= deadlineTime) {
throw PException.timeout(
((PegasusTable)table).getMetaList(),
((PegasusTable)table).getTable().getTableName(),
new PegasusTable.Request(hashKey), timeout, new TimeoutException());
}

Pair<Pair<byte[], byte[]>, byte[]> pair;
while ((pair = pegasusScanner.next()) != null
&& (maxFetchCount <= 0 || scanRangeResult.results.size() < maxFetchCount)) {
if (System.currentTimeMillis() >= deadlineTime) {
throw PException.timeout(
((PegasusTable)table).getMetaList(),
((PegasusTable)table).getTable().getTableName(),
new PegasusTable.Request(hashKey), timeout, new TimeoutException());
}
scanRangeResult.results.add(pair);
}
Pair<Pair<byte[], byte[]>, byte[]> pair;
while ((pair = scanner.next()) != null
&& (maxFetchCount <= 0 || scanRangeResult.results.size() < maxFetchCount)) {
if (System.currentTimeMillis() >= deadlineTime) {
throw PException.timeout(
((PegasusTable) table).getMetaList(),
((PegasusTable) table).getTable().getTableName(),
new PegasusTable.Request(request.hashKey),
timeout,
new TimeoutException());
}
scanRangeResult.results.add(pair);
}

if (pegasusScanner.next() == null) {
scanRangeResult.allFetched = true;
}
return scanRangeResult;
}
if (scanner.next() == null) {
scanRangeResult.allFetched = true;
}
return scanRangeResult;
}
}