Skip to content

Commit

Permalink
fix encode range issue with unbound end
Browse files Browse the repository at this point in the history
Signed-off-by: iosmanthus <[email protected]>
  • Loading branch information
iosmanthus committed Jul 14, 2022
1 parent 182a457 commit 0e76e10
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 26 deletions.
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
//! # })}
//! ```
#![feature(min_specialization)]
#[macro_use]
pub mod request;
#[macro_use]
Expand Down
19 changes: 13 additions & 6 deletions src/raw/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ use crate::{
collect_first,
pd::PdClient,
request::{
codec::RequestCodec, plan::ResponseWithShard, Collect, CollectSingle, DefaultProcessor,
KvRequest, Merge, Process, Shardable, SingleKey,
codec::{RequestCodec, RequestCodecExt},
plan::ResponseWithShard,
Collect, CollectSingle, DefaultProcessor, KvRequest, Merge, Process, Shardable, SingleKey,
},
store::{store_stream_for_keys, store_stream_for_ranges, RegionStore},
transaction::HasLocks,
Expand Down Expand Up @@ -215,6 +216,7 @@ pub fn new_raw_scan_request(
req
}

has_reverse!(kvrpcpb::RawScanRequest);
impl_kv_request!(kvrpcpb::RawScanRequest; kvrpcpb::RawScanResponse, kvs);
shardable_range!(kvrpcpb::RawScanRequest);

Expand Down Expand Up @@ -244,10 +246,15 @@ pub fn new_raw_batch_scan_request(
req
}

impl_kv_request!(
kvrpcpb::RawBatchScanRequest, ranges;
kvrpcpb::RawBatchScanResponse, kvs
);
impl<C: RequestCodec> KvRequest<C> for kvrpcpb::RawBatchScanRequest {
type Response = kvrpcpb::RawBatchScanResponse;
fn encode_request(mut self, codec: &C) -> Self {
*self.mut_ranges() = codec.encode_ranges(self.take_ranges(), self.get_reverse());
self
}

impl_decode_response! {kvs}
}

impl Shardable for kvrpcpb::RawBatchScanRequest {
type Shard = Vec<kvrpcpb::KeyRange>;
Expand Down
68 changes: 52 additions & 16 deletions src/request/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ type Prefix = [u8; KEYSPACE_PREFIX_LEN];

const KEYSPACE_PREFIX_LEN: usize = 4;

const MAX_KEYSPACE_ID: KeySpaceId = KeySpaceId([0xff, 0xff, 0xff]);

pub trait RequestCodec: Sized + Clone + Sync + Send + 'static {
fn encode_request<'a, R: KvRequest<Self>>(&self, req: &'a R) -> Cow<'a, R> {
Cow::Borrowed(req)
Expand All @@ -29,7 +27,7 @@ pub trait RequestCodec: Sized + Clone + Sync + Send + 'static {
Ok(())
}

fn encode_range(&self, start: Vec<u8>, end: Vec<u8>) -> (Vec<u8>, Vec<u8>) {
fn encode_range(&self, start: Vec<u8>, end: Vec<u8>, _reverse: bool) -> (Vec<u8>, Vec<u8>) {
(start, end)
}

Expand Down Expand Up @@ -74,6 +72,10 @@ pub trait RequestCodecExt: RequestCodec {
keys.into_iter().map(|key| self.encode_key(key)).collect()
}

fn encode_secondaries(&self, secondaries: Vec<Vec<u8>>) -> Vec<Vec<u8>> {
self.encode_keys(secondaries)
}

fn encode_pairs(&self, mut pairs: Vec<kvrpcpb::KvPair>) -> Vec<kvrpcpb::KvPair> {
for pair in pairs.iter_mut() {
*pair.mut_key() = self.encode_key(pair.take_key());
Expand All @@ -82,9 +84,14 @@ pub trait RequestCodecExt: RequestCodec {
pairs
}

fn encode_ranges(&self, mut ranges: Vec<kvrpcpb::KeyRange>) -> Vec<kvrpcpb::KeyRange> {
fn encode_ranges(
&self,
mut ranges: Vec<kvrpcpb::KeyRange>,
reverse: bool,
) -> Vec<kvrpcpb::KeyRange> {
for range in ranges.iter_mut() {
let (start, end) = self.encode_range(range.take_start_key(), range.take_end_key());
let (start, end) =
self.encode_range(range.take_start_key(), range.take_end_key(), reverse);
*range.mut_start_key() = start;
*range.mut_end_key() = end;
}
Expand Down Expand Up @@ -217,18 +224,18 @@ pub trait Mode: Clone + Copy + Sync + Send + 'static {
const MAX_KEY: &'static [u8] = &[Self::PREFIX + 1, 0, 0, 0];
}

#[derive(Clone, Copy)]
#[derive(Default, Clone, Copy)]
pub struct RawMode;

#[derive(Clone, Copy)]
#[derive(Default, Clone, Copy)]
pub struct TxnMode;

impl Mode for RawMode {
const PREFIX: u8 = b'r';
}

impl Mode for TxnMode {
const PREFIX: u8 = b't';
const PREFIX: u8 = b'x';
}

#[derive(Clone)]
Expand Down Expand Up @@ -263,18 +270,37 @@ impl RequestCodec for ApiV1<TxnMode> {

impl TxnCodec for ApiV1<TxnMode> {}

#[derive(Clone, Copy, Default)]
#[derive(Clone, Copy)]
pub struct KeySpace<M: Mode> {
id: KeySpaceId,
_phantom: PhantomData<M>,
}

impl<M: Mode> Default for KeySpace<M> {
fn default() -> Self {
KeySpace {
id: KeySpaceId::default(),
_phantom: PhantomData,
}
}
}

impl<M: Mode> From<KeySpace<M>> for Prefix {
fn from(s: KeySpace<M>) -> Self {
[M::PREFIX, s.id[0], s.id[1], s.id[2]]
}
}

impl<M: Mode> KeySpace<M> {
fn start(self) -> Prefix {
self.into()
}

fn end(self) -> Prefix {
(u32::from_be_bytes(self.into()) + 1).to_be_bytes()
}
}

#[derive(Default, Clone)]
pub struct ApiV2<M: Mode> {
keyspace: KeySpace<M>,
Expand Down Expand Up @@ -318,12 +344,21 @@ impl<M: Mode> RequestCodec for ApiV2<M> {
Ok(())
}

fn encode_range(&self, start: Vec<u8>, end: Vec<u8>) -> (Vec<u8>, Vec<u8>) {
if self.keyspace.id == MAX_KEYSPACE_ID {
(self.encode_key(start), M::MAX_KEY.to_vec())
} else {
(self.encode_key(start), self.encode_key(end))
fn encode_range(&self, start: Vec<u8>, end: Vec<u8>, reverse: bool) -> (Vec<u8>, Vec<u8>) {
if reverse {
let (start, end) = self.encode_range(end, start, false);
return (end, start);
}

let start = self.encode_key(start);

let end = if end.is_empty() {
self.keyspace.end().into()
} else {
self.encode_key(end)
};

(start, end)
}

fn encode_pd_query(&self, key: Vec<u8>) -> Vec<u8> {
Expand All @@ -335,14 +370,15 @@ impl<M: Mode> RequestCodec for ApiV2<M> {
decode_bytes_in_place(region.mut_end_key(), false)?;

// Map the region's start key to the keyspace start key.
if region.get_start_key() < M::MIN_KEY {
if region.get_start_key() <= self.keyspace.start().as_slice() {
*region.mut_start_key() = vec![];
} else {
self.decode_key(region.mut_start_key())?;
}

// Map the region's end key to the keyspace end key.
if region.get_end_key().is_empty() || region.get_end_key() > M::MAX_KEY {
if region.get_end_key().is_empty() || region.get_end_key() >= self.keyspace.end().as_slice()
{
*region.mut_end_key() = vec![];
} else {
self.decode_key(region.mut_end_key())?;
Expand Down
27 changes: 24 additions & 3 deletions src/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,26 @@ impl<T: PartialEq + Default> IsDefault for T {
}
}

pub(crate) trait HasReverse {
fn has_reverse(&self) -> bool;
}

impl<T> HasReverse for T {
default fn has_reverse(&self) -> bool {
false
}
}

macro_rules! has_reverse {
($t:ty) => {
impl $crate::request::HasReverse for $t {
fn has_reverse(&self) -> bool {
self.get_reverse()
}
}
};
}

macro_rules! impl_decode_response {
($($o:ident)*) => {
fn decode_response(&self, codec: &C, mut resp: Self::Response) -> Result<Self::Response> {
Expand All @@ -75,7 +95,6 @@ macro_rules! impl_decode_response {
};
}

#[macro_export]
macro_rules! impl_kv_request {
($req:ty $(,$i:ident)+; $resp:ty $(,$o:ident)*) => {
impl<C> KvRequest<C> for $req
Expand All @@ -102,12 +121,14 @@ macro_rules! impl_kv_request {

($req:ty; $resp:ty $(,$o:ident)*) => {
impl<C> KvRequest<C> for $req
where C: RequestCodec
where C: RequestCodec,
$req: $crate::request::HasReverse
{
type Response = $resp;

fn encode_request(mut self, codec: &C) -> Self {
let (start, end) = codec.encode_range(self.take_start_key(), self.take_end_key());
use $crate::request::HasReverse;
let (start, end) = codec.encode_range(self.take_start_key(), self.take_end_key(), self.has_reverse());
*self.mut_start_key() = start;
*self.mut_end_key() = end;

Expand Down
6 changes: 5 additions & 1 deletion src/transaction/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ pub fn new_scan_request(
req
}

has_reverse!(kvrpcpb::ScanRequest);
impl_kv_request!(kvrpcpb::ScanRequest; kvrpcpb::ScanResponse, pairs, error);
shardable_range!(kvrpcpb::ScanRequest);

Expand Down Expand Up @@ -233,7 +234,10 @@ pub fn new_pessimistic_prewrite_request(
req
}

impl_kv_request!(kvrpcpb::PrewriteRequest, mutations; kvrpcpb::PrewriteResponse, errors);
impl_kv_request!(
kvrpcpb::PrewriteRequest, mutations, primary_lock, secondaries;
kvrpcpb::PrewriteResponse, errors
);

impl Shardable for kvrpcpb::PrewriteRequest {
type Shard = Vec<kvrpcpb::Mutation>;
Expand Down

0 comments on commit 0e76e10

Please sign in to comment.