From a9b002759d1358be5a48210adb9328568fe8bb20 Mon Sep 17 00:00:00 2001 From: Andrey Koshchiy Date: Wed, 8 Feb 2023 14:36:14 +0400 Subject: [PATCH] Batch split for prewrite and commit requests (#390) Signed-off-by: Andrey Koshchiy --- config/tikv.toml | 1 + src/request/mod.rs | 2 +- src/request/shard.rs | 95 ++++++++++++++++++++++++++++++++++ src/transaction/requests.rs | 69 ++++++++++++++++++++++-- src/transaction/transaction.rs | 2 +- tests/integration_tests.rs | 38 ++++++++++++++ 6 files changed, 201 insertions(+), 6 deletions(-) diff --git a/config/tikv.toml b/config/tikv.toml index 0337dd7d..52965253 100644 --- a/config/tikv.toml +++ b/config/tikv.toml @@ -8,6 +8,7 @@ region-split-check-diff = "1B" pd-heartbeat-tick-interval = "2s" pd-store-heartbeat-tick-interval = "5s" split-region-check-tick-interval = "1s" +raft-entry-max-size = "1MB" [rocksdb] max-open-files = 10000 diff --git a/src/request/mod.rs b/src/request/mod.rs index 959ff5e7..6572aa4c 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -15,7 +15,7 @@ pub use self::{ ResponseWithShard, RetryableMultiRegion, }, plan_builder::{PlanBuilder, SingleKey}, - shard::Shardable, + shard::{Batchable, HasNextBatch, NextBatch, Shardable}, }; pub mod plan; diff --git a/src/request/shard.rs b/src/request/shard.rs index cdf3f198..1ec87b07 100644 --- a/src/request/shard.rs +++ b/src/request/shard.rs @@ -38,6 +38,44 @@ pub trait Shardable { fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()>; } +pub trait Batchable { + type Item; + + fn batches(items: Vec, batch_size: u64) -> Vec> { + let mut batches: Vec> = Vec::new(); + let mut batch: Vec = Vec::new(); + let mut size = 0; + + for item in items { + let item_size = Self::item_size(&item); + if size + item_size >= batch_size && !batch.is_empty() { + batches.push(batch); + batch = Vec::new(); + size = 0; + } + size += item_size; + batch.push(item); + } + if !batch.is_empty() { + batches.push(batch) + } + batches + } + + fn item_size(item: &Self::Item) -> u64; +} + +// Use to iterate in a region for scan requests that have batch size limit. +// HasNextBatch use to get the next batch according to previous response. +pub trait HasNextBatch { + fn has_next_batch(&self) -> Option<(Vec, Vec)>; +} + +// NextBatch use to change start key of request by result of `has_next_batch`. +pub trait NextBatch { + fn next_batch(&mut self, _range: (Vec, Vec)); +} + impl Shardable for Dispatch { type Shard = Req::Shard; @@ -167,3 +205,60 @@ macro_rules! shardable_range { } }; } + +#[cfg(test)] +mod test { + use rand::{thread_rng, Rng}; + + use super::Batchable; + + #[test] + fn test_batches() { + let mut rng = thread_rng(); + + let items: Vec<_> = (0..3) + .map(|_| (0..2).map(|_| rng.gen::()).collect::>()) + .collect(); + + let batch_size = 5; + + let batches = BatchableTest::batches(items.clone(), batch_size); + + assert_eq!(batches.len(), 2); + assert_eq!(batches[0].len(), 2); + assert_eq!(batches[1].len(), 1); + assert_eq!(batches[0][0], items[0]); + assert_eq!(batches[0][1], items[1]); + assert_eq!(batches[1][0], items[2]); + } + + #[test] + fn test_batches_big_item() { + let mut rng = thread_rng(); + + let items: Vec<_> = (0..3) + .map(|_| (0..3).map(|_| rng.gen::()).collect::>()) + .collect(); + + let batch_size = 2; + + let batches = BatchableTest::batches(items.clone(), batch_size); + + assert_eq!(batches.len(), 3); + for i in 0..items.len() { + let batch = &batches[i]; + assert_eq!(batch.len(), 1); + assert_eq!(batch[0], items[i]); + } + } + + struct BatchableTest; + + impl Batchable for BatchableTest { + type Item = Vec; + + fn item_size(item: &Self::Item) -> u64 { + item.len() as u64 + } + } +} diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 3965e148..d092334a 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -4,8 +4,8 @@ use crate::{ collect_first, pd::PdClient, request::{ - Collect, CollectSingle, CollectWithShard, DefaultProcessor, KvRequest, Merge, Process, - ResponseWithShard, Shardable, SingleKey, + Batchable, Collect, CollectSingle, CollectWithShard, DefaultProcessor, + KvRequest, Merge, Process, ResponseWithShard, Shardable, SingleKey, }, store::{store_stream_for_keys, store_stream_for_range_by_start_key, RegionStore}, timestamp::TimestampExt, @@ -14,7 +14,10 @@ use crate::{ Key, KvPair, Result, Value, }; use either::Either; -use futures::stream::BoxStream; +use futures::{ + stream::{self, BoxStream}, + StreamExt, +}; use std::{collections::HashMap, iter, sync::Arc}; use tikv_client_common::Error::PessimisticLockError; use tikv_client_proto::{ @@ -22,6 +25,8 @@ use tikv_client_proto::{ pdpb::Timestamp, }; +use super::transaction::TXN_COMMIT_BATCH_SIZE; + // implement HasLocks for a response type that has a `pairs` field, // where locks can be extracted from both the `pairs` and `error` fields macro_rules! pair_locks { @@ -251,7 +256,18 @@ impl Shardable for kvrpcpb::PrewriteRequest { let mut mutations = self.mutations.clone(); mutations.sort_by(|a, b| a.key.cmp(&b.key)); log::debug!("PrewriteRequest shards"); + store_stream_for_keys(mutations.into_iter(), pd_client.clone()) + .flat_map(|result| match result { + Ok((mutations, store)) => stream::iter(kvrpcpb::PrewriteRequest::batches( + mutations, + TXN_COMMIT_BATCH_SIZE, + )) + .map(move |batch| Ok((batch, store.clone()))) + .boxed(), + Err(e) => stream::iter(Err(e)).boxed(), + }) + .boxed() } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { @@ -272,6 +288,16 @@ impl Shardable for kvrpcpb::PrewriteRequest { } } +impl Batchable for kvrpcpb::PrewriteRequest { + type Item = kvrpcpb::Mutation; + + fn item_size(item: &Self::Item) -> u64 { + let mut size = item.get_key().len() as u64; + size += item.get_value().len() as u64; + size + } +} + pub fn new_commit_request( keys: Vec>, start_version: u64, @@ -289,7 +315,42 @@ impl KvRequest for kvrpcpb::CommitRequest { type Response = kvrpcpb::CommitResponse; } -shardable_keys!(kvrpcpb::CommitRequest); +impl Shardable for kvrpcpb::CommitRequest { + type Shard = Vec>; + + fn shards( + &self, + pd_client: &Arc, + ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { + let mut keys = self.keys.clone(); + keys.sort(); + + store_stream_for_keys(keys.into_iter(), pd_client.clone()) + .flat_map(|result| match result { + Ok((keys, store)) => { + stream::iter(kvrpcpb::CommitRequest::batches(keys, TXN_COMMIT_BATCH_SIZE)) + .map(move |batch| Ok((batch, store.clone()))) + .boxed() + } + Err(e) => stream::iter(Err(e)).boxed(), + }) + .boxed() + } + + fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { + self.set_context(store.region_with_leader.context()?); + self.set_keys(shard.into_iter().map(Into::into).collect()); + Ok(()) + } +} + +impl Batchable for kvrpcpb::CommitRequest { + type Item = Vec; + + fn item_size(item: &Self::Item) -> u64 { + item.len() as u64 + } +} pub fn new_batch_rollback_request( keys: Vec>, diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index bc8d071c..7889e136 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -1108,7 +1108,7 @@ const DEFAULT_LOCK_TTL: u64 = 3000; const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_millis(MAX_TTL / 2); /// TiKV recommends each RPC packet should be less than around 1MB. We keep KV size of /// each request below 16KB. -const TXN_COMMIT_BATCH_SIZE: u64 = 16 * 1024; +pub const TXN_COMMIT_BATCH_SIZE: u64 = 16 * 1024; const TTL_FACTOR: f64 = 6000.0; /// Optimistic or pessimistic transaction. diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 9d174aad..91ef04e4 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -175,6 +175,44 @@ async fn txn_pessimistic() -> Result<()> { Ok(()) } +#[tokio::test] +#[serial] +async fn txn_split_batch() -> Result<()> { + init().await?; + + let client = TransactionClient::new(pd_addrs(), None).await?; + let mut txn = client.begin_optimistic().await?; + let mut rng = thread_rng(); + + // testing with raft-entry-max-size = "1MB" + let keys_count: usize = 1000; + let val_len = 15000; + + let values: Vec<_> = (0..keys_count) + .map(|_| (0..val_len).map(|_| rng.gen::()).collect::>()) + .collect(); + + for (i, value) in values.iter().enumerate() { + let key = Key::from(i.to_be_bytes().to_vec()); + txn.put(key, value.clone()).await?; + } + + txn.commit().await?; + + let mut snapshot = client.snapshot( + client.current_timestamp().await?, + TransactionOptions::new_optimistic(), + ); + + for (i, value) in values.iter().enumerate() { + let key = Key::from(i.to_be_bytes().to_vec()); + let from_snapshot = snapshot.get(key).await?.unwrap(); + assert_eq!(from_snapshot, value.clone()); + } + + Ok(()) +} + /// bank transfer mainly tests raw put and get #[tokio::test] #[serial]