Skip to content

Commit

Permalink
Revert "*: Support API v2 (part 1) (tikv#415)"
Browse files Browse the repository at this point in the history
This reverts commit 4b0e844.
andylokandy committed Dec 14, 2023
1 parent bbaf317 commit 4db9895
Showing 15 changed files with 95 additions and 306 deletions.
5 changes: 1 addition & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -94,8 +94,6 @@

pub mod backoff;
#[doc(hidden)]
pub mod proto; // export `proto` to enable user customized codec
#[doc(hidden)]
pub mod raw;
pub mod request;
#[doc(hidden)]
@@ -106,6 +104,7 @@ mod compat;
mod config;
mod kv;
mod pd;
mod proto;
mod region;
mod region_cache;
mod stats;
@@ -146,8 +145,6 @@ pub use crate::raw::Client as RawClient;
#[doc(inline)]
pub use crate::raw::ColumnFamily;
#[doc(inline)]
pub use crate::request::codec;
#[doc(inline)]
pub use crate::request::RetryOptions;
#[doc(inline)]
pub use crate::timestamp::Timestamp;
21 changes: 2 additions & 19 deletions src/mock.rs
Original file line number Diff line number Diff line change
@@ -18,7 +18,6 @@ use crate::proto::metapb::RegionEpoch;
use crate::proto::metapb::{self};
use crate::region::RegionId;
use crate::region::RegionWithLeader;
use crate::request::codec::ApiV1TxnCodec;
use crate::store::KvConnect;
use crate::store::RegionStore;
use crate::store::Request;
@@ -31,7 +30,7 @@ use crate::Timestamp;

/// Create a `PdRpcClient` with it's internals replaced with mocks so that the
/// client can be tested without doing any RPC calls.
pub async fn pd_rpc_client() -> PdRpcClient<ApiV1TxnCodec, MockKvConnect, MockCluster> {
pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
let config = Config::default();
PdRpcClient::new(
config.clone(),
@@ -44,7 +43,6 @@ pub async fn pd_rpc_client() -> PdRpcClient<ApiV1TxnCodec, MockKvConnect, MockCl
))
},
false,
Some(ApiV1TxnCodec::default()),
)
.await
.unwrap()
@@ -73,18 +71,9 @@ pub struct MockKvConnect;

pub struct MockCluster;

#[derive(new)]
pub struct MockPdClient {
client: MockKvClient,
codec: ApiV1TxnCodec,
}

impl MockPdClient {
pub fn new(client: MockKvClient) -> MockPdClient {
MockPdClient {
client,
codec: ApiV1TxnCodec::default(),
}
}
}

#[async_trait]
@@ -113,7 +102,6 @@ impl MockPdClient {
pub fn default() -> MockPdClient {
MockPdClient {
client: MockKvClient::default(),
codec: ApiV1TxnCodec::default(),
}
}

@@ -177,7 +165,6 @@ impl MockPdClient {

#[async_trait]
impl PdClient for MockPdClient {
type Codec = ApiV1TxnCodec;
type KvClient = MockKvClient;

async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore> {
@@ -227,8 +214,4 @@ impl PdClient for MockPdClient {
}

async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {}

fn get_codec(&self) -> &Self::Codec {
&self.codec
}
}
63 changes: 17 additions & 46 deletions src/pd/client.rs
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@ use crate::region::RegionId;
use crate::region::RegionVerId;
use crate::region::RegionWithLeader;
use crate::region_cache::RegionCache;
use crate::request::codec::{ApiV1TxnCodec, Codec};
use crate::store::KvConnect;
use crate::store::RegionStore;
use crate::store::TikvConnect;
@@ -51,7 +50,6 @@ use crate::Timestamp;
/// So if we use transactional APIs, keys in PD are encoded and PD does not know about the encoding stuff.
#[async_trait]
pub trait PdClient: Send + Sync + 'static {
type Codec: Codec;
type KvClient: KvClient + Send + Sync + 'static;

/// In transactional API, `region` is decoded (keys in raw format).
@@ -193,11 +191,8 @@ pub trait PdClient: Send + Sync + 'static {
.boxed()
}

fn decode_region(
mut region: RegionWithLeader,
enable_mvcc_codec: bool,
) -> Result<RegionWithLeader> {
if enable_mvcc_codec {
fn decode_region(mut region: RegionWithLeader, enable_codec: bool) -> Result<RegionWithLeader> {
if enable_codec {
codec::decode_bytes_in_place(&mut region.region.start_key, false)?;
codec::decode_bytes_in_place(&mut region.region.end_key, false)?;
}
@@ -207,30 +202,20 @@ pub trait PdClient: Send + Sync + 'static {
async fn update_leader(&self, ver_id: RegionVerId, leader: metapb::Peer) -> Result<()>;

async fn invalidate_region_cache(&self, ver_id: RegionVerId);

/// Get the codec carried by `PdClient`.
/// The purpose of carrying the codec is to avoid passing it on so many calling paths.
fn get_codec(&self) -> &Self::Codec;
}

/// This client converts requests for the logical TiKV cluster into requests
/// for a single TiKV store using PD and internal logic.
pub struct PdRpcClient<
Cod: Codec = ApiV1TxnCodec,
KvC: KvConnect + Send + Sync + 'static = TikvConnect,
Cl = Cluster,
> {
pub struct PdRpcClient<KvC: KvConnect + Send + Sync + 'static = TikvConnect, Cl = Cluster> {
pd: Arc<RetryClient<Cl>>,
kv_connect: KvC,
kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
enable_mvcc_codec: bool,
enable_codec: bool,
region_cache: RegionCache<RetryClient<Cl>>,
codec: Option<Cod>,
}

#[async_trait]
impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<Cod, KvC> {
type Codec = Cod;
impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
type KvClient = KvC::KvClient;

async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore> {
@@ -241,20 +226,20 @@ impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClien
}

async fn region_for_key(&self, key: &Key) -> Result<RegionWithLeader> {
let enable_mvcc_codec = self.enable_mvcc_codec;
let key = if enable_mvcc_codec {
let enable_codec = self.enable_codec;
let key = if enable_codec {
key.to_encoded()
} else {
key.clone()
};

let region = self.region_cache.get_region_by_key(&key).await?;
Self::decode_region(region, enable_mvcc_codec)
Self::decode_region(region, enable_codec)
}

async fn region_for_id(&self, id: RegionId) -> Result<RegionWithLeader> {
let region = self.region_cache.get_region_by_id(id).await?;
Self::decode_region(region, self.enable_mvcc_codec)
Self::decode_region(region, self.enable_codec)
}

async fn all_stores(&self) -> Result<Vec<Store>> {
@@ -282,40 +267,31 @@ impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClien
async fn invalidate_region_cache(&self, ver_id: RegionVerId) {
self.region_cache.invalidate_region_cache(ver_id).await
}

fn get_codec(&self) -> &Self::Codec {
self.codec
.as_ref()
.unwrap_or_else(|| panic!("codec not set"))
}
}

impl<Cod: Codec> PdRpcClient<Cod, TikvConnect, Cluster> {
impl PdRpcClient<TikvConnect, Cluster> {
pub async fn connect(
pd_endpoints: &[String],
config: Config,
enable_mvcc_codec: bool, // TODO: infer from `codec`.
codec: Option<Cod>,
) -> Result<PdRpcClient<Cod>> {
enable_codec: bool,
) -> Result<PdRpcClient> {
PdRpcClient::new(
config.clone(),
|security_mgr| TikvConnect::new(security_mgr, config.timeout),
|security_mgr| RetryClient::connect(pd_endpoints, security_mgr, config.timeout),
enable_mvcc_codec,
codec,
enable_codec,
)
.await
}
}

impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<Cod, KvC, Cl> {
impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
pub async fn new<PdFut, MakeKvC, MakePd>(
config: Config,
kv_connect: MakeKvC,
pd: MakePd,
enable_mvcc_codec: bool,
codec: Option<Cod>,
) -> Result<PdRpcClient<Cod, KvC, Cl>>
enable_codec: bool,
) -> Result<PdRpcClient<KvC, Cl>>
where
PdFut: Future<Output = Result<RetryClient<Cl>>>,
MakeKvC: FnOnce(Arc<SecurityManager>) -> KvC,
@@ -337,9 +313,8 @@ impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<Cod, Kv
pd: pd.clone(),
kv_client_cache,
kv_connect: kv_connect(security_mgr),
enable_mvcc_codec,
enable_codec,
region_cache: RegionCache::new(pd),
codec,
})
}

@@ -359,10 +334,6 @@ impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<Cod, Kv
Err(e) => Err(e),
}
}

pub fn set_codec(&mut self, codec: Cod) {
self.codec = Some(codec);
}
}

fn make_key_range(start_key: Vec<u8>, end_key: Vec<u8>) -> kvrpcpb::KeyRange {
51 changes: 15 additions & 36 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
@@ -15,7 +15,6 @@ use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::proto::metapb;
use crate::raw::lowering::*;
use crate::request::codec::{ApiV1RawCodec, Codec, EncodedRequest};
use crate::request::Collect;
use crate::request::CollectSingle;
use crate::request::Plan;
@@ -36,11 +35,7 @@ const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
///
/// The returned results of raw request methods are [`Future`](std::future::Future)s that must be
/// awaited to execute.
pub struct Client<Cod = ApiV1RawCodec, PdC = PdRpcClient<Cod>>
where
Cod: Codec,
PdC: PdClient<Codec = Cod>,
{
pub struct Client<PdC: PdClient = PdRpcClient> {
rpc: Arc<PdC>,
cf: Option<ColumnFamily>,
backoff: Backoff,
@@ -59,7 +54,7 @@ impl Clone for Client {
}
}

impl Client<ApiV1RawCodec, PdRpcClient<ApiV1RawCodec>> {
impl Client<PdRpcClient> {
/// Create a raw [`Client`] and connect to the TiKV cluster.
///
/// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for
@@ -105,10 +100,7 @@ impl Client<ApiV1RawCodec, PdRpcClient<ApiV1RawCodec>> {
config: Config,
) -> Result<Self> {
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let rpc = Arc::new(
PdRpcClient::connect(&pd_endpoints, config, false, Some(ApiV1RawCodec::default()))
.await?,
);
let rpc = Arc::new(PdRpcClient::connect(&pd_endpoints, config, false).await?);
Ok(Client {
rpc,
cf: None,
@@ -150,9 +142,7 @@ impl Client<ApiV1RawCodec, PdRpcClient<ApiV1RawCodec>> {
atomic: self.atomic,
}
}
}

impl<Cod: Codec> Client<Cod, PdRpcClient<Cod>> {
/// Set the [`Backoff`] strategy for retrying requests.
/// The default strategy is [`DEFAULT_REGION_BACKOFF`](crate::backoff::DEFAULT_REGION_BACKOFF).
/// See [`Backoff`] for more information.
@@ -199,7 +189,7 @@ impl<Cod: Codec> Client<Cod, PdRpcClient<Cod>> {
}
}

impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
impl<PdC: PdClient> Client<PdC> {
/// Create a new 'get' request.
///
/// Once resolved this request will result in the fetching of the value associated with the
@@ -221,8 +211,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
pub async fn get(&self, key: impl Into<Key>) -> Result<Option<Value>> {
debug!("invoking raw get request");
let request = new_raw_get_request(key.into(), self.cf.clone());
let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(self.backoff.clone())
.merge(CollectSingle)
.post_process_default()
@@ -254,8 +243,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
) -> Result<Vec<KvPair>> {
debug!("invoking raw batch_get request");
let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone());
let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(self.backoff.clone())
.merge(Collect)
.plan();
@@ -283,8 +271,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
debug!("invoking raw put request");
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic);
let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(self.backoff.clone())
.merge(CollectSingle)
.extract_error()
@@ -320,8 +307,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
self.cf.clone(),
self.atomic,
);
let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(self.backoff.clone())
.extract_error()
.plan();
@@ -349,8 +335,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
pub async fn delete(&self, key: impl Into<Key>) -> Result<()> {
debug!("invoking raw delete request");
let request = new_raw_delete_request(key.into(), self.cf.clone(), self.atomic);
let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(self.backoff.clone())
.merge(CollectSingle)
.extract_error()
@@ -381,8 +366,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
self.assert_non_atomic()?;
let request =
new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone());
let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(self.backoff.clone())
.extract_error()
.plan();
@@ -409,8 +393,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
debug!("invoking raw delete_range request");
self.assert_non_atomic()?;
let request = new_raw_delete_range_request(range.into(), self.cf.clone());
let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(self.backoff.clone())
.extract_error()
.plan();
@@ -566,8 +549,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
previous_value.into(),
self.cf.clone(),
);
let encoded_req = EncodedRequest::new(req, self.rpc.get_codec());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req)
.retry_multi_region(self.backoff.clone())
.merge(CollectSingle)
.post_process_default()
@@ -590,8 +572,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
ranges.into_iter().map(Into::into),
request_builder,
);
let encoded_req = EncodedRequest::new(req, self.rpc.get_codec());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req)
.preserve_shard()
.retry_multi_region(self.backoff.clone())
.post_process_default()
@@ -625,8 +606,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
while cur_limit > 0 {
let request =
new_raw_scan_request(cur_range.clone(), cur_limit, key_only, self.cf.clone());
let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
let resp = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
let resp = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.single_region_with_store(region_store.clone())
.await?
.plan()
@@ -681,8 +661,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
key_only,
self.cf.clone(),
);
let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(self.backoff.clone())
.merge(Collect)
.plan();
11 changes: 4 additions & 7 deletions src/raw/requests.rs
Original file line number Diff line number Diff line change
@@ -13,7 +13,6 @@ use super::RawRpcRequest;
use crate::collect_first;
use crate::pd::PdClient;
use crate::proto::kvrpcpb;
use crate::proto::kvrpcpb::ApiVersion;
use crate::proto::metapb;
use crate::proto::tikvpb::tikv_client::TikvClient;
use crate::range_request;
@@ -164,7 +163,7 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
}

fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_context(store.region_with_leader.context()?);
self.context = Some(store.region_with_leader.context()?);
self.pairs = shard;
Ok(())
}
@@ -297,7 +296,7 @@ impl Shardable for kvrpcpb::RawBatchScanRequest {
}

fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_context(store.region_with_leader.context()?);
self.context = Some(store.region_with_leader.context()?);
self.ranges = shard;
Ok(())
}
@@ -403,7 +402,7 @@ impl Request for RawCoprocessorRequest {
self.inner.set_context(context);
}

fn set_api_version(&mut self, api_version: ApiVersion) {
fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion) {
self.inner.set_api_version(api_version);
}
}
@@ -505,7 +504,6 @@ mod test {
use crate::mock::MockKvClient;
use crate::mock::MockPdClient;
use crate::proto::kvrpcpb;
use crate::request::codec::EncodedRequest;
use crate::request::Plan;
use crate::Key;

@@ -540,8 +538,7 @@ mod test {
key_only: true,
..Default::default()
};
let encoded_scan = EncodedRequest::new(scan, client.get_codec());
let plan = crate::request::PlanBuilder::new(client, encoded_scan)
let plan = crate::request::PlanBuilder::new(client, scan)
.resolve_lock(OPTIMISTIC_BACKOFF)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(Collect)
54 changes: 0 additions & 54 deletions src/request/codec.rs

This file was deleted.

19 changes: 6 additions & 13 deletions src/request/mod.rs
Original file line number Diff line number Diff line change
@@ -33,7 +33,6 @@ use crate::store::Request;
use crate::store::{HasKeyErrors, Store};
use crate::transaction::HasLocks;

pub mod codec;
pub mod plan;
mod plan_builder;
mod shard;
@@ -43,9 +42,6 @@ mod shard;
pub trait KvRequest: Request + Sized + Clone + Sync + Send + 'static {
/// The expected response to the request.
type Response: HasKeyErrors + HasLocks + Clone + Send + 'static;

// TODO: fn encode_request()
// TODO: fn decode_response()
}

/// For requests or plans which are handled at TiKV store (other than region) level.
@@ -98,12 +94,9 @@ mod test {
use super::*;
use crate::mock::MockKvClient;
use crate::mock::MockPdClient;
use crate::pd::PdClient;
use crate::proto::kvrpcpb;
use crate::proto::kvrpcpb::ApiVersion;
use crate::proto::pdpb::Timestamp;
use crate::proto::tikvpb::tikv_client::TikvClient;
use crate::request::codec::EncodedRequest;
use crate::store::store_stream_for_keys;
use crate::store::HasRegionError;
use crate::transaction::lowering::new_commit_request;
@@ -153,7 +146,9 @@ mod test {
unreachable!();
}

fn set_api_version(&mut self, _api_version: ApiVersion) {}
fn set_api_version(&mut self, _: kvrpcpb::ApiVersion) {
unreachable!();
}
}

#[async_trait]
@@ -199,8 +194,7 @@ mod test {
|_: &dyn Any| Ok(Box::new(MockRpcResponse) as Box<dyn Any>),
)));

let encoded_req = EncodedRequest::new(request, pd_client.get_codec());
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
let plan = crate::request::PlanBuilder::new(pd_client.clone(), request)
.resolve_lock(Backoff::no_jitter_backoff(1, 1, 3))
.retry_multi_region(Backoff::no_jitter_backoff(1, 1, 3))
.extract_error()
@@ -224,17 +218,16 @@ mod test {

let key: Key = "key".to_owned().into();
let req = new_commit_request(iter::once(key), Timestamp::default(), Timestamp::default());
let encoded_req = EncodedRequest::new(req, pd_client.get_codec());

// does not extract error
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req.clone())
let plan = crate::request::PlanBuilder::new(pd_client.clone(), req.clone())
.resolve_lock(OPTIMISTIC_BACKOFF)
.retry_multi_region(OPTIMISTIC_BACKOFF)
.plan();
assert!(plan.execute().await.is_ok());

// extract error
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
let plan = crate::request::PlanBuilder::new(pd_client.clone(), req)
.resolve_lock(OPTIMISTIC_BACKOFF)
.retry_multi_region(OPTIMISTIC_BACKOFF)
.extract_error()
5 changes: 2 additions & 3 deletions src/request/plan_builder.rs
Original file line number Diff line number Diff line change
@@ -6,7 +6,6 @@ use std::sync::Arc;
use super::plan::PreserveShard;
use crate::backoff::Backoff;
use crate::pd::PdClient;
use crate::request::codec::EncodedRequest;
use crate::request::plan::{CleanupLocks, RetryableAllStores};
use crate::request::shard::HasNextBatch;
use crate::request::Dispatch;
@@ -47,11 +46,11 @@ pub struct Targetted;
impl PlanBuilderPhase for Targetted {}

impl<PdC: PdClient, Req: KvRequest> PlanBuilder<PdC, Dispatch<Req>, NoTarget> {
pub fn new(pd_client: Arc<PdC>, encoded_request: EncodedRequest<Req>) -> Self {
pub fn new(pd_client: Arc<PdC>, request: Req) -> Self {
PlanBuilder {
pd_client,
plan: Dispatch {
request: encoded_request.inner,
request,
kv_client: None,
},
phantom: PhantomData,
6 changes: 3 additions & 3 deletions src/request/shard.rs
Original file line number Diff line number Diff line change
@@ -164,7 +164,7 @@ macro_rules! shardable_key {
mut shard: Self::Shard,
store: &$crate::store::RegionStore,
) -> $crate::Result<()> {
self.set_context(store.region_with_leader.context()?);
self.context = Some(store.region_with_leader.context()?);
assert!(shard.len() == 1);
self.key = shard.pop().unwrap();
Ok(())
@@ -197,7 +197,7 @@ macro_rules! shardable_keys {
shard: Self::Shard,
store: &$crate::store::RegionStore,
) -> $crate::Result<()> {
self.set_context(store.region_with_leader.context()?);
self.context = Some(store.region_with_leader.context()?);
self.keys = shard.into_iter().map(Into::into).collect();
Ok(())
}
@@ -257,7 +257,7 @@ macro_rules! shardable_range {
shard: Self::Shard,
store: &$crate::store::RegionStore,
) -> $crate::Result<()> {
self.set_context(store.region_with_leader.context()?);
self.context = Some(store.region_with_leader.context()?);

// In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key.
// As a result, after obtaining start_key and end_key from PD, we need to swap their values when assigning them to the request.
8 changes: 0 additions & 8 deletions src/store/request.rs
Original file line number Diff line number Diff line change
@@ -21,8 +21,6 @@ pub trait Request: Any + Sync + Send + 'static {
) -> Result<Box<dyn Any>>;
fn label(&self) -> &'static str;
fn as_any(&self) -> &dyn Any;
/// Set the context for the request.
/// Should always use `set_context` other than modify the `self.context` directly.
fn set_context(&mut self, context: kvrpcpb::Context);
fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion);
}
@@ -55,13 +53,7 @@ macro_rules! impl_request {
}

fn set_context(&mut self, context: kvrpcpb::Context) {
let api_version = self
.context
.as_ref()
.map(|c| c.api_version)
.unwrap_or_default();
self.context = Some(context);
self.set_api_version(kvrpcpb::ApiVersion::try_from(api_version).unwrap());
}

fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion) {
68 changes: 14 additions & 54 deletions src/transaction/client.rs
Original file line number Diff line number Diff line change
@@ -10,7 +10,6 @@ use crate::config::Config;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::proto::pdpb::Timestamp;
use crate::request::codec::{ApiV1TxnCodec, ApiV2TxnCodec, Codec, EncodedRequest};
use crate::request::plan::CleanupLocksResult;
use crate::request::Plan;
use crate::timestamp::TimestampExt;
@@ -44,19 +43,19 @@ const SCAN_LOCK_BATCH_SIZE: u32 = 1024;
///
/// The returned results of transactional requests are [`Future`](std::future::Future)s that must be
/// awaited to execute.
pub struct Client<Cod: Codec = ApiV1TxnCodec> {
pd: Arc<PdRpcClient<Cod>>,
pub struct Client {
pd: Arc<PdRpcClient>,
}

impl<Cod: Codec> Clone for Client<Cod> {
impl Clone for Client {
fn clone(&self) -> Self {
Self {
pd: self.pd.clone(),
}
}
}

impl Client<ApiV1TxnCodec> {
impl Client {
/// Create a transactional [`Client`] and connect to the TiKV cluster.
///
/// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for
@@ -73,6 +72,7 @@ impl Client<ApiV1TxnCodec> {
/// # });
/// ```
pub async fn new<S: Into<String>>(pd_endpoints: Vec<S>) -> Result<Client> {
// debug!("creating transactional client");
Self::new_with_config(pd_endpoints, Config::default()).await
}

@@ -101,35 +101,9 @@ impl Client<ApiV1TxnCodec> {
pd_endpoints: Vec<S>,
config: Config,
) -> Result<Client> {
Self::new_with_codec(pd_endpoints, config, ApiV1TxnCodec::default()).await
}
}

impl Client<ApiV2TxnCodec> {
pub async fn new_with_config_v2<S: Into<String>>(
_keyspace_name: &str,
pd_endpoints: Vec<S>,
config: Config,
) -> Result<Client<ApiV2TxnCodec>> {
debug!("creating new transactional client APIv2");
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let mut pd = PdRpcClient::connect(&pd_endpoints, config, true, None).await?;
let keyspace_id = 0; // TODO: get keyspace_id by pd.get_keyspace(keyspace_name)
pd.set_codec(ApiV2TxnCodec::new(keyspace_id));
Ok(Client { pd: Arc::new(pd) })
}
}

impl<Cod: Codec> Client<Cod> {
pub async fn new_with_codec<S: Into<String>>(
pd_endpoints: Vec<S>,
config: Config,
codec: Cod,
) -> Result<Client<Cod>> {
debug!("creating new transactional client");
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let pd =
Arc::new(PdRpcClient::<Cod>::connect(&pd_endpoints, config, true, Some(codec)).await?);
let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config, true).await?);
Ok(Client { pd })
}

@@ -153,7 +127,7 @@ impl<Cod: Codec> Client<Cod> {
/// transaction.commit().await.unwrap();
/// # });
/// ```
pub async fn begin_optimistic(&self) -> Result<Transaction<Cod, PdRpcClient<Cod>>> {
pub async fn begin_optimistic(&self) -> Result<Transaction> {
debug!("creating new optimistic transaction");
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(timestamp, TransactionOptions::new_optimistic()))
@@ -176,7 +150,7 @@ impl<Cod: Codec> Client<Cod> {
/// transaction.commit().await.unwrap();
/// # });
/// ```
pub async fn begin_pessimistic(&self) -> Result<Transaction<Cod, PdRpcClient<Cod>>> {
pub async fn begin_pessimistic(&self) -> Result<Transaction> {
debug!("creating new pessimistic transaction");
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(timestamp, TransactionOptions::new_pessimistic()))
@@ -199,21 +173,14 @@ impl<Cod: Codec> Client<Cod> {
/// transaction.commit().await.unwrap();
/// # });
/// ```
pub async fn begin_with_options(
&self,
options: TransactionOptions,
) -> Result<Transaction<Cod, PdRpcClient<Cod>>> {
pub async fn begin_with_options(&self, options: TransactionOptions) -> Result<Transaction> {
debug!("creating new customized transaction");
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(timestamp, options))
}

/// Create a new [`Snapshot`](Snapshot) at the given [`Timestamp`](Timestamp).
pub fn snapshot(
&self,
timestamp: Timestamp,
options: TransactionOptions,
) -> Snapshot<Cod, PdRpcClient<Cod>> {
pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> Snapshot {
debug!("creating new snapshot");
Snapshot::new(self.new_transaction(timestamp, options.read_only()))
}
@@ -280,8 +247,7 @@ impl<Cod: Codec> Client<Cod> {
let ctx = ResolveLocksContext::default();
let backoff = Backoff::equal_jitter_backoff(100, 10000, 50);
let req = new_scan_lock_request(range.into(), safepoint, options.batch_size);
let encoded_req = EncodedRequest::new(req, self.pd.get_codec());
let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req)
let plan = crate::request::PlanBuilder::new(self.pd.clone(), req)
.cleanup_locks(ctx.clone(), options, backoff)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.extract_error()
@@ -300,8 +266,7 @@ impl<Cod: Codec> Client<Cod> {
batch_size: u32,
) -> Result<Vec<crate::proto::kvrpcpb::LockInfo>> {
let req = new_scan_lock_request(range.into(), safepoint, batch_size);
let encoded_req = EncodedRequest::new(req, self.pd.get_codec());
let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req)
let plan = crate::request::PlanBuilder::new(self.pd.clone(), req)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(crate::request::Collect)
.plan();
@@ -317,19 +282,14 @@ impl<Cod: Codec> Client<Cod> {
/// This interface is intended for special scenarios that resemble operations like "drop table" or "drop database" in TiDB.
pub async fn unsafe_destroy_range(&self, range: impl Into<BoundRange>) -> Result<()> {
let req = new_unsafe_destroy_range_request(range.into());
let encoded_req = EncodedRequest::new(req, self.pd.get_codec());
let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req)
let plan = crate::request::PlanBuilder::new(self.pd.clone(), req)
.all_stores(DEFAULT_STORE_BACKOFF)
.merge(crate::request::Collect)
.plan();
plan.execute().await
}

fn new_transaction(
&self,
timestamp: Timestamp,
options: TransactionOptions,
) -> Transaction<Cod, PdRpcClient<Cod>> {
fn new_transaction(&self, timestamp: Timestamp, options: TransactionOptions) -> Transaction {
Transaction::new(timestamp, self.pd.clone(), options)
}
}
17 changes: 6 additions & 11 deletions src/transaction/lock.rs
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@ use crate::proto::kvrpcpb;
use crate::proto::kvrpcpb::TxnInfo;
use crate::proto::pdpb::Timestamp;
use crate::region::RegionVerId;
use crate::request::codec::EncodedRequest;
use crate::request::Collect;
use crate::request::CollectSingle;
use crate::request::Plan;
@@ -76,8 +75,7 @@ pub async fn resolve_locks(
Some(&commit_version) => commit_version,
None => {
let request = requests::new_cleanup_request(lock.primary_lock, lock.lock_version);
let encoded_req = EncodedRequest::new(request, pd_client.get_codec());
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
let plan = crate::request::PlanBuilder::new(pd_client.clone(), request)
.resolve_lock(OPTIMISTIC_BACKOFF)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(CollectSingle)
@@ -118,8 +116,8 @@ async fn resolve_lock_with_retry(
let store = pd_client.clone().store_for_key(key.into()).await?;
let ver_id = store.region_with_leader.ver_id();
let request = requests::new_resolve_lock_request(start_version, commit_version);
let encoded_req = EncodedRequest::new(request, pd_client.get_codec());
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
// The only place where single-region is used
let plan = crate::request::PlanBuilder::new(pd_client.clone(), request)
.single_region_with_store(store)
.await?
.resolve_lock(Backoff::no_backoff())
@@ -359,8 +357,7 @@ impl LockResolver {
force_sync_commit,
resolving_pessimistic_lock,
);
let encoded_req = EncodedRequest::new(req, pd_client.get_codec());
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
let plan = crate::request::PlanBuilder::new(pd_client.clone(), req)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(CollectSingle)
.extract_error()
@@ -384,8 +381,7 @@ impl LockResolver {
txn_id: u64,
) -> Result<SecondaryLocksStatus> {
let req = new_check_secondary_locks_request(keys, txn_id);
let encoded_req = EncodedRequest::new(req, pd_client.get_codec());
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
let plan = crate::request::PlanBuilder::new(pd_client.clone(), req)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.extract_error()
.merge(Collect)
@@ -401,8 +397,7 @@ impl LockResolver {
) -> Result<RegionVerId> {
let ver_id = store.region_with_leader.ver_id();
let request = requests::new_batch_resolve_lock_request(txn_infos.clone());
let encoded_req = EncodedRequest::new(request, pd_client.get_codec());
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
let plan = crate::request::PlanBuilder::new(pd_client.clone(), request)
.single_region_with_store(store.clone())
.await?
.extract_error()
13 changes: 6 additions & 7 deletions src/transaction/requests.rs
Original file line number Diff line number Diff line change
@@ -39,7 +39,6 @@ use crate::shardable_keys;
use crate::shardable_range;
use crate::store::store_stream_for_range;
use crate::store::RegionStore;
use crate::store::Request;
use crate::store::{store_stream_for_keys, Store};
use crate::timestamp::TimestampExt;
use crate::transaction::HasLocks;
@@ -298,7 +297,7 @@ impl Shardable for kvrpcpb::PrewriteRequest {
}

fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_context(store.region_with_leader.context()?);
self.context = Some(store.region_with_leader.context()?);

// Only need to set secondary keys if we're sending the primary key.
if self.use_async_commit && !self.mutations.iter().any(|m| m.key == self.primary_lock) {
@@ -365,7 +364,7 @@ impl Shardable for kvrpcpb::CommitRequest {
}

fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_context(store.region_with_leader.context()?);
self.context = Some(store.region_with_leader.context()?);
self.keys = shard.into_iter().map(Into::into).collect();
Ok(())
}
@@ -456,7 +455,7 @@ impl Shardable for kvrpcpb::PessimisticLockRequest {
}

fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_context(store.region_with_leader.context()?);
self.context = Some(store.region_with_leader.context()?);
self.mutations = shard;
Ok(())
}
@@ -557,7 +556,7 @@ impl Shardable for kvrpcpb::ScanLockRequest {
}

fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_context(store.region_with_leader.context()?);
self.context = Some(store.region_with_leader.context()?);
self.start_key = shard.0;
Ok(())
}
@@ -618,7 +617,7 @@ impl Shardable for kvrpcpb::TxnHeartBeatRequest {
}

fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_context(store.region_with_leader.context()?);
self.context = Some(store.region_with_leader.context()?);
assert!(shard.len() == 1);
self.primary_lock = shard.pop().unwrap();
Ok(())
@@ -676,7 +675,7 @@ impl Shardable for kvrpcpb::CheckTxnStatusRequest {
}

fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_context(store.region_with_leader.context()?);
self.context = Some(store.region_with_leader.context()?);
assert!(shard.len() == 1);
self.primary_key = shard.pop().unwrap();
Ok(())
11 changes: 3 additions & 8 deletions src/transaction/snapshot.rs
Original file line number Diff line number Diff line change
@@ -2,11 +2,7 @@

use derive_new::new;
use log::debug;
use std::marker::PhantomData;

use crate::codec::ApiV1TxnCodec;
use crate::pd::{PdClient, PdRpcClient};
use crate::request::codec::Codec;
use crate::BoundRange;
use crate::Key;
use crate::KvPair;
@@ -22,12 +18,11 @@ use crate::Value;
///
/// See the [Transaction](struct@crate::Transaction) docs for more information on the methods.
#[derive(new)]
pub struct Snapshot<Cod: Codec = ApiV1TxnCodec, PdC: PdClient<Codec = Cod> = PdRpcClient<Cod>> {
transaction: Transaction<Cod, PdC>,
phantom: PhantomData<Cod>,
pub struct Snapshot {
transaction: Transaction,
}

impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Snapshot<Cod, PdC> {
impl Snapshot {
/// Get the value associated with the given key.
pub async fn get(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
debug!("invoking get request on snapshot");
49 changes: 16 additions & 33 deletions src/transaction/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::iter;
use std::marker::PhantomData;
use std::sync::atomic;
use std::sync::atomic::AtomicU8;
use std::sync::Arc;
@@ -16,12 +15,10 @@ use tokio::time::Duration;

use crate::backoff::Backoff;
use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::codec::ApiV1TxnCodec;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::proto::kvrpcpb;
use crate::proto::pdpb::Timestamp;
use crate::request::codec::{Codec, EncodedRequest};
use crate::request::Collect;
use crate::request::CollectError;
use crate::request::CollectSingle;
@@ -77,23 +74,22 @@ use crate::Value;
/// txn.commit().await.unwrap();
/// # });
/// ```
pub struct Transaction<Cod: Codec = ApiV1TxnCodec, PdC: PdClient<Codec = Cod> = PdRpcClient<Cod>> {
pub struct Transaction<PdC: PdClient = PdRpcClient> {
status: Arc<AtomicU8>,
timestamp: Timestamp,
buffer: Buffer,
rpc: Arc<PdC>,
options: TransactionOptions,
is_heartbeat_started: bool,
start_instant: Instant,
phantom: PhantomData<Cod>,
}

impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
impl<PdC: PdClient> Transaction<PdC> {
pub(crate) fn new(
timestamp: Timestamp,
rpc: Arc<PdC>,
options: TransactionOptions,
) -> Transaction<Cod, PdC> {
) -> Transaction<PdC> {
let status = if options.read_only {
TransactionStatus::ReadOnly
} else {
@@ -107,7 +103,6 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
options,
is_heartbeat_started: false,
start_instant: std::time::Instant::now(),
phantom: PhantomData,
}
}

@@ -140,8 +135,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
self.buffer
.get_or_else(key, |key| async move {
let request = new_get_request(key, timestamp);
let encoded_req = EncodedRequest::new(request, rpc.get_codec());
let plan = PlanBuilder::new(rpc, encoded_req)
let plan = PlanBuilder::new(rpc, request)
.resolve_lock(retry_options.lock_backoff)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(CollectSingle)
@@ -271,8 +265,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
self.buffer
.batch_get_or_else(keys.into_iter().map(|k| k.into()), move |keys| async move {
let request = new_batch_get_request(keys, timestamp);
let encoded_req = EncodedRequest::new(request, rpc.get_codec());
let plan = PlanBuilder::new(rpc, encoded_req)
let plan = PlanBuilder::new(rpc, request)
.resolve_lock(retry_options.lock_backoff)
.retry_multi_region(retry_options.region_backoff)
.merge(Collect)
@@ -745,8 +738,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
primary_key,
self.start_instant.elapsed().as_millis() as u64 + MAX_TTL,
);
let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
let plan = PlanBuilder::new(self.rpc.clone(), encoded_req)
let plan = PlanBuilder::new(self.rpc.clone(), request)
.resolve_lock(self.options.retry_options.lock_backoff.clone())
.retry_multi_region(self.options.retry_options.region_backoff.clone())
.merge(CollectSingle)
@@ -776,8 +768,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
move |new_range, new_limit| async move {
let request =
new_scan_request(new_range, timestamp, new_limit, key_only, reverse);
let encoded_req = EncodedRequest::new(request, rpc.get_codec());
let plan = PlanBuilder::new(rpc, encoded_req)
let plan = PlanBuilder::new(rpc, request)
.resolve_lock(retry_options.lock_backoff)
.retry_multi_region(retry_options.region_backoff)
.merge(Collect)
@@ -832,8 +823,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
for_update_ts.clone(),
need_value,
);
let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
let plan = PlanBuilder::new(self.rpc.clone(), encoded_req)
let plan = PlanBuilder::new(self.rpc.clone(), request)
.resolve_lock(self.options.retry_options.lock_backoff.clone())
.preserve_shard()
.retry_multi_region_preserve_results(self.options.retry_options.region_backoff.clone())
@@ -887,8 +877,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
start_version,
for_update_ts,
);
let encoded_req = EncodedRequest::new(req, self.rpc.get_codec());
let plan = PlanBuilder::new(self.rpc.clone(), encoded_req)
let plan = PlanBuilder::new(self.rpc.clone(), req)
.resolve_lock(self.options.retry_options.lock_backoff.clone())
.retry_multi_region(self.options.retry_options.region_backoff.clone())
.extract_error()
@@ -957,8 +946,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
primary_key.clone(),
start_instant.elapsed().as_millis() as u64 + MAX_TTL,
);
let encoded_req = EncodedRequest::new(request, rpc.get_codec());
let plan = PlanBuilder::new(rpc.clone(), encoded_req)
let plan = PlanBuilder::new(rpc.clone(), request)
.retry_multi_region(region_backoff.clone())
.merge(CollectSingle)
.plan();
@@ -1005,7 +993,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
}
}

impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Drop for Transaction<Cod, PdC> {
impl<PdC: PdClient> Drop for Transaction<PdC> {
fn drop(&mut self) {
debug!("dropping transaction");
if std::thread::panicking() {
@@ -1296,8 +1284,7 @@ impl<PdC: PdClient> Committer<PdC> {
.collect();
// FIXME set max_commit_ts and min_commit_ts

let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
let plan = PlanBuilder::new(self.rpc.clone(), encoded_req)
let plan = PlanBuilder::new(self.rpc.clone(), request)
.resolve_lock(self.options.retry_options.lock_backoff.clone())
.retry_multi_region(self.options.retry_options.region_backoff.clone())
.merge(CollectError)
@@ -1337,8 +1324,7 @@ impl<PdC: PdClient> Committer<PdC> {
self.start_version.clone(),
commit_version.clone(),
);
let encoded_req = EncodedRequest::new(req, self.rpc.get_codec());
let plan = PlanBuilder::new(self.rpc.clone(), encoded_req)
let plan = PlanBuilder::new(self.rpc.clone(), req)
.resolve_lock(self.options.retry_options.lock_backoff.clone())
.retry_multi_region(self.options.retry_options.region_backoff.clone())
.extract_error()
@@ -1402,8 +1388,7 @@ impl<PdC: PdClient> Committer<PdC> {
.filter(|key| &primary_key != key);
new_commit_request(keys, self.start_version, commit_version)
};
let encoded_req = EncodedRequest::new(req, self.rpc.get_codec());
let plan = PlanBuilder::new(self.rpc, encoded_req)
let plan = PlanBuilder::new(self.rpc, req)
.resolve_lock(self.options.retry_options.lock_backoff)
.retry_multi_region(self.options.retry_options.region_backoff)
.extract_error()
@@ -1424,8 +1409,7 @@ impl<PdC: PdClient> Committer<PdC> {
match self.options.kind {
TransactionKind::Optimistic => {
let req = new_batch_rollback_request(keys, self.start_version);
let encoded_req = EncodedRequest::new(req, self.rpc.get_codec());
let plan = PlanBuilder::new(self.rpc, encoded_req)
let plan = PlanBuilder::new(self.rpc, req)
.resolve_lock(self.options.retry_options.lock_backoff)
.retry_multi_region(self.options.retry_options.region_backoff)
.extract_error()
@@ -1434,8 +1418,7 @@ impl<PdC: PdClient> Committer<PdC> {
}
TransactionKind::Pessimistic(for_update_ts) => {
let req = new_pessimistic_rollback_request(keys, self.start_version, for_update_ts);
let encoded_req = EncodedRequest::new(req, self.rpc.get_codec());
let plan = PlanBuilder::new(self.rpc, encoded_req)
let plan = PlanBuilder::new(self.rpc, req)
.resolve_lock(self.options.retry_options.lock_backoff)
.retry_multi_region(self.options.retry_options.region_backoff)
.extract_error()

0 comments on commit 4db9895

Please sign in to comment.