From 0e76e1089a63d3a864b55fd945bc83c3a97a4139 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Thu, 14 Jul 2022 20:17:45 +0800 Subject: [PATCH] fix encode range issue with unbound end Signed-off-by: iosmanthus --- src/lib.rs | 1 + src/raw/requests.rs | 19 +++++++---- src/request/codec.rs | 68 ++++++++++++++++++++++++++++--------- src/request/mod.rs | 27 +++++++++++++-- src/transaction/requests.rs | 6 +++- 5 files changed, 95 insertions(+), 26 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 7a1c5b9f..815bf9ed 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -92,6 +92,7 @@ //! # })} //! ``` +#![feature(min_specialization)] #[macro_use] pub mod request; #[macro_use] diff --git a/src/raw/requests.rs b/src/raw/requests.rs index e2053f09..40676b2a 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -13,8 +13,9 @@ use crate::{ collect_first, pd::PdClient, request::{ - codec::RequestCodec, plan::ResponseWithShard, Collect, CollectSingle, DefaultProcessor, - KvRequest, Merge, Process, Shardable, SingleKey, + codec::{RequestCodec, RequestCodecExt}, + plan::ResponseWithShard, + Collect, CollectSingle, DefaultProcessor, KvRequest, Merge, Process, Shardable, SingleKey, }, store::{store_stream_for_keys, store_stream_for_ranges, RegionStore}, transaction::HasLocks, @@ -215,6 +216,7 @@ pub fn new_raw_scan_request( req } +has_reverse!(kvrpcpb::RawScanRequest); impl_kv_request!(kvrpcpb::RawScanRequest; kvrpcpb::RawScanResponse, kvs); shardable_range!(kvrpcpb::RawScanRequest); @@ -244,10 +246,15 @@ pub fn new_raw_batch_scan_request( req } -impl_kv_request!( - kvrpcpb::RawBatchScanRequest, ranges; - kvrpcpb::RawBatchScanResponse, kvs -); +impl KvRequest for kvrpcpb::RawBatchScanRequest { + type Response = kvrpcpb::RawBatchScanResponse; + fn encode_request(mut self, codec: &C) -> Self { + *self.mut_ranges() = codec.encode_ranges(self.take_ranges(), self.get_reverse()); + self + } + + impl_decode_response! {kvs} +} impl Shardable for kvrpcpb::RawBatchScanRequest { type Shard = Vec; diff --git a/src/request/codec.rs b/src/request/codec.rs index 3cdf9595..d90c3f22 100644 --- a/src/request/codec.rs +++ b/src/request/codec.rs @@ -14,8 +14,6 @@ type Prefix = [u8; KEYSPACE_PREFIX_LEN]; const KEYSPACE_PREFIX_LEN: usize = 4; -const MAX_KEYSPACE_ID: KeySpaceId = KeySpaceId([0xff, 0xff, 0xff]); - pub trait RequestCodec: Sized + Clone + Sync + Send + 'static { fn encode_request<'a, R: KvRequest>(&self, req: &'a R) -> Cow<'a, R> { Cow::Borrowed(req) @@ -29,7 +27,7 @@ pub trait RequestCodec: Sized + Clone + Sync + Send + 'static { Ok(()) } - fn encode_range(&self, start: Vec, end: Vec) -> (Vec, Vec) { + fn encode_range(&self, start: Vec, end: Vec, _reverse: bool) -> (Vec, Vec) { (start, end) } @@ -74,6 +72,10 @@ pub trait RequestCodecExt: RequestCodec { keys.into_iter().map(|key| self.encode_key(key)).collect() } + fn encode_secondaries(&self, secondaries: Vec>) -> Vec> { + self.encode_keys(secondaries) + } + fn encode_pairs(&self, mut pairs: Vec) -> Vec { for pair in pairs.iter_mut() { *pair.mut_key() = self.encode_key(pair.take_key()); @@ -82,9 +84,14 @@ pub trait RequestCodecExt: RequestCodec { pairs } - fn encode_ranges(&self, mut ranges: Vec) -> Vec { + fn encode_ranges( + &self, + mut ranges: Vec, + reverse: bool, + ) -> Vec { for range in ranges.iter_mut() { - let (start, end) = self.encode_range(range.take_start_key(), range.take_end_key()); + let (start, end) = + self.encode_range(range.take_start_key(), range.take_end_key(), reverse); *range.mut_start_key() = start; *range.mut_end_key() = end; } @@ -217,10 +224,10 @@ pub trait Mode: Clone + Copy + Sync + Send + 'static { const MAX_KEY: &'static [u8] = &[Self::PREFIX + 1, 0, 0, 0]; } -#[derive(Clone, Copy)] +#[derive(Default, Clone, Copy)] pub struct RawMode; -#[derive(Clone, Copy)] +#[derive(Default, Clone, Copy)] pub struct TxnMode; impl Mode for RawMode { @@ -228,7 +235,7 @@ impl Mode for RawMode { } impl Mode for TxnMode { - const PREFIX: u8 = b't'; + const PREFIX: u8 = b'x'; } #[derive(Clone)] @@ -263,18 +270,37 @@ impl RequestCodec for ApiV1 { impl TxnCodec for ApiV1 {} -#[derive(Clone, Copy, Default)] +#[derive(Clone, Copy)] pub struct KeySpace { id: KeySpaceId, _phantom: PhantomData, } +impl Default for KeySpace { + fn default() -> Self { + KeySpace { + id: KeySpaceId::default(), + _phantom: PhantomData, + } + } +} + impl From> for Prefix { fn from(s: KeySpace) -> Self { [M::PREFIX, s.id[0], s.id[1], s.id[2]] } } +impl KeySpace { + fn start(self) -> Prefix { + self.into() + } + + fn end(self) -> Prefix { + (u32::from_be_bytes(self.into()) + 1).to_be_bytes() + } +} + #[derive(Default, Clone)] pub struct ApiV2 { keyspace: KeySpace, @@ -318,12 +344,21 @@ impl RequestCodec for ApiV2 { Ok(()) } - fn encode_range(&self, start: Vec, end: Vec) -> (Vec, Vec) { - if self.keyspace.id == MAX_KEYSPACE_ID { - (self.encode_key(start), M::MAX_KEY.to_vec()) - } else { - (self.encode_key(start), self.encode_key(end)) + fn encode_range(&self, start: Vec, end: Vec, reverse: bool) -> (Vec, Vec) { + if reverse { + let (start, end) = self.encode_range(end, start, false); + return (end, start); } + + let start = self.encode_key(start); + + let end = if end.is_empty() { + self.keyspace.end().into() + } else { + self.encode_key(end) + }; + + (start, end) } fn encode_pd_query(&self, key: Vec) -> Vec { @@ -335,14 +370,15 @@ impl RequestCodec for ApiV2 { decode_bytes_in_place(region.mut_end_key(), false)?; // Map the region's start key to the keyspace start key. - if region.get_start_key() < M::MIN_KEY { + if region.get_start_key() <= self.keyspace.start().as_slice() { *region.mut_start_key() = vec![]; } else { self.decode_key(region.mut_start_key())?; } // Map the region's end key to the keyspace end key. - if region.get_end_key().is_empty() || region.get_end_key() > M::MAX_KEY { + if region.get_end_key().is_empty() || region.get_end_key() >= self.keyspace.end().as_slice() + { *region.mut_end_key() = vec![]; } else { self.decode_key(region.mut_end_key())?; diff --git a/src/request/mod.rs b/src/request/mod.rs index c8973039..96152f1a 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -51,6 +51,26 @@ impl IsDefault for T { } } +pub(crate) trait HasReverse { + fn has_reverse(&self) -> bool; +} + +impl HasReverse for T { + default fn has_reverse(&self) -> bool { + false + } +} + +macro_rules! has_reverse { + ($t:ty) => { + impl $crate::request::HasReverse for $t { + fn has_reverse(&self) -> bool { + self.get_reverse() + } + } + }; +} + macro_rules! impl_decode_response { ($($o:ident)*) => { fn decode_response(&self, codec: &C, mut resp: Self::Response) -> Result { @@ -75,7 +95,6 @@ macro_rules! impl_decode_response { }; } -#[macro_export] macro_rules! impl_kv_request { ($req:ty $(,$i:ident)+; $resp:ty $(,$o:ident)*) => { impl KvRequest for $req @@ -102,12 +121,14 @@ macro_rules! impl_kv_request { ($req:ty; $resp:ty $(,$o:ident)*) => { impl KvRequest for $req - where C: RequestCodec + where C: RequestCodec, + $req: $crate::request::HasReverse { type Response = $resp; fn encode_request(mut self, codec: &C) -> Self { - let (start, end) = codec.encode_range(self.take_start_key(), self.take_end_key()); + use $crate::request::HasReverse; + let (start, end) = codec.encode_range(self.take_start_key(), self.take_end_key(), self.has_reverse()); *self.mut_start_key() = start; *self.mut_end_key() = end; diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 09573b38..673c7840 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -141,6 +141,7 @@ pub fn new_scan_request( req } +has_reverse!(kvrpcpb::ScanRequest); impl_kv_request!(kvrpcpb::ScanRequest; kvrpcpb::ScanResponse, pairs, error); shardable_range!(kvrpcpb::ScanRequest); @@ -233,7 +234,10 @@ pub fn new_pessimistic_prewrite_request( req } -impl_kv_request!(kvrpcpb::PrewriteRequest, mutations; kvrpcpb::PrewriteResponse, errors); +impl_kv_request!( + kvrpcpb::PrewriteRequest, mutations, primary_lock, secondaries; + kvrpcpb::PrewriteResponse, errors +); impl Shardable for kvrpcpb::PrewriteRequest { type Shard = Vec;