Skip to content

Commit

Permalink
removing the lambda to simplify the retry logic
Browse files Browse the repository at this point in the history
Signed-off-by: limbooverlambda <[email protected]>
  • Loading branch information
limbooverlambda committed Aug 5, 2024
1 parent 4740269 commit 032ceca
Showing 1 changed file with 7 additions and 19 deletions.
26 changes: 7 additions & 19 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use core::ops::Range;
use std::future::Future;
use std::pin::Pin;

use std::str::FromStr;
use std::sync::Arc;

Expand All @@ -14,7 +13,7 @@ use crate::config::Config;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::proto::kvrpcpb::{RawScanRequest, RawScanResponse};
use crate::proto::{errorpb, metapb};
use crate::proto::metapb;
use crate::raw::lowering::*;
use crate::request::CollectSingle;
use crate::request::EncodeKeyspace;
Expand Down Expand Up @@ -766,10 +765,6 @@ impl<PdC: PdClient> Client<PdC> {
let (start_key, end_key) = range.clone().into_keys();
let mut current_key: Key = start_key;

let region_error_handler =
|pd_rpc_client: Arc<PdC>, err: errorpb::Error, store: RegionStore| {
Box::pin(plan::handle_region_error(pd_rpc_client, err, store))
} as _;
while current_limit > 0 {
let scan_args = ScanInnerArgs {
start_key: current_key.clone(),
Expand All @@ -779,7 +774,7 @@ impl<PdC: PdClient> Client<PdC> {
reverse,
backoff: backoff.clone(),
};
let (res, next_key) = self.retryable_scan(scan_args, region_error_handler).await?;
let (res, next_key) = self.retryable_scan(scan_args).await?;

let mut kvs = res
.map(|r| r.kvs.into_iter().map(Into::into).collect::<Vec<KvPair>>())
Expand All @@ -805,18 +800,10 @@ impl<PdC: PdClient> Client<PdC> {
Ok(result)
}

async fn retryable_scan<'a, F>(
async fn retryable_scan(
&self,
mut scan_args: ScanInnerArgs,
mut error_handler: F,
) -> Result<(Option<RawScanResponse>, Key)>
where
F: FnMut(
Arc<PdC>,
errorpb::Error,
RegionStore,
) -> Pin<Box<dyn Future<Output = Result<bool>>>>,
{
) -> Result<(Option<RawScanResponse>, Key)> {
let start_key = scan_args.start_key;

let region = self.rpc.clone().region_for_key(&start_key).await?;
Expand All @@ -834,7 +821,8 @@ impl<PdC: PdClient> Client<PdC> {
Ok(mut r) => {
if let Some(err) = r.region_error() {
let status =
error_handler(self.rpc.clone(), err.clone(), store.clone()).await?;
plan::handle_region_error(self.rpc.clone(), err.clone(), store.clone())
.await?;
if status {
continue;
} else if let Some(duration) = scan_args.backoff.next_delay_duration() {
Expand Down

0 comments on commit 032ceca

Please sign in to comment.