Skip to content

Commit c04fd41

Browse files
BusyJayti-chi-bot
andauthored
raftclient: count term and index in estimated size (tikv#11493)
* add test case Signed-off-by: Jay Lee <[email protected]> * count term and index tag If there are many entries in a message, the estimated size of message can be way smaller than the actual size. This PR fixes the error by also counting index and term in estimation. It also remove the hard limit as the estimation is closed enough. Close tikv#9714. Signed-off-by: Jay Lee <[email protected]> Co-authored-by: Ti Chi Robot <[email protected]>
1 parent 237fe39 commit c04fd41

File tree

2 files changed

+42
-2
lines changed

2 files changed

+42
-2
lines changed

src/server/raft_client.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,9 @@ impl Buffer for BatchMessageBuffer {
174174
let mut msg_size = msg.start_key.len()
175175
+ msg.end_key.len()
176176
+ msg.get_message().context.len()
177-
+ msg.extra_ctx.len();
177+
+ msg.extra_ctx.len()
178+
// index: 3, term: 2, data tag and size: 3, entry tag and size: 3
179+
+ 11 * msg.get_message().get_entries().len();
178180
for entry in msg.get_message().get_entries() {
179181
msg_size += entry.data.len();
180182
}
@@ -568,7 +570,6 @@ where
568570

569571
let cb = ChannelBuilder::new(self.builder.env.clone())
570572
.stream_initial_window_size(self.builder.cfg.grpc_stream_initial_window_size.0 as i32)
571-
.max_send_message_len(self.builder.cfg.max_grpc_send_msg_len)
572573
.keepalive_time(self.builder.cfg.grpc_keepalive_time.0)
573574
.keepalive_timeout(self.builder.cfg.grpc_keepalive_timeout.0)
574575
.default_compression_algorithm(self.builder.cfg.grpc_compression_algorithm())

tests/integrations/server/raft_client.rs

+39
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,45 @@ fn test_batch_size_limit() {
217217
assert_eq!(msg_count.load(Ordering::SeqCst), 10);
218218
}
219219

220+
/// In edge case that the estimated size may be inaccurate, we need to ensure connection
221+
/// will not be broken in this case.
222+
#[test]
223+
fn test_batch_size_edge_limit() {
224+
let msg_count = Arc::new(AtomicUsize::new(0));
225+
let batch_msg_count = Arc::new(AtomicUsize::new(0));
226+
let service = MockKvForRaft::new(Arc::clone(&msg_count), Arc::clone(&batch_msg_count), true);
227+
let (mock_server, port) = create_mock_server(service, 60200, 60300).unwrap();
228+
229+
let mut raft_client = get_raft_client_by_port(port);
230+
231+
// Put them in buffer so sibling messages will be likely be batched during sending.
232+
let mut msgs = Vec::with_capacity(5);
233+
for _ in 0..5 {
234+
let mut raft_m = RaftMessage::default();
235+
// Magic number, this can make estimated size about 4940000, hence two messages will be
236+
// batched together, but the total size will be way largher than 10MiB as there are many
237+
// indexes and terms.
238+
for _ in 0..38000 {
239+
let mut e = Entry::default();
240+
e.set_term(1);
241+
e.set_index(256);
242+
e.set_data(vec![b'a'; 130].into());
243+
raft_m.mut_message().mut_entries().push(e);
244+
}
245+
msgs.push(raft_m);
246+
}
247+
for m in msgs {
248+
raft_client.send(m).unwrap();
249+
}
250+
raft_client.flush();
251+
252+
check_msg_count(10000, &msg_count, 5);
253+
// The final received message count should be 5 exactly.
254+
drop(raft_client);
255+
drop(mock_server);
256+
assert_eq!(msg_count.load(Ordering::SeqCst), 5);
257+
}
258+
220259
// Try to create a mock server with `service`. The server will be binded wiht a random
221260
// port chosen between [`min_port`, `max_port`]. Return `None` if no port is available.
222261
fn create_mock_server<T>(service: T, min_port: u16, max_port: u16) -> Option<(Server, u16)>

0 commit comments

Comments
 (0)